]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Adapt bvfs for SQLite3
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32
33 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
34
35 #include "cats.h"
36 #include "bdb_priv.h"
37 #include "sql_glue.h"
38 #include "lib/htable.h"
39 #include "bvfs.h"
40
41 #define dbglevel 10
42 #define dbglevel_sql 15
43
44 static int result_handler(void *ctx, int fields, char **row)
45 {
46    if (fields == 4) {
47       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
48             row[0], row[1], row[2], row[3]);
49    } else if (fields == 5) {
50       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
51             row[0], row[1], row[2], row[3], row[4]);
52    } else if (fields == 6) {
53       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
54             row[0], row[1], row[2], row[3], row[4], row[5]);
55    } else if (fields == 7) {
56       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
57             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
58    }
59    return 0;
60 }
61
62 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
63    jcr = j;
64    jcr->inc_use_count();
65    db = mdb;                 /* need to inc ref count */
66    jobids = get_pool_memory(PM_NAME);
67    prev_dir = get_pool_memory(PM_NAME);
68    pattern = get_pool_memory(PM_NAME);
69    *jobids = *prev_dir = *pattern = 0;
70    dir_filenameid = pwd_id = offset = 0;
71    see_copies = see_all_versions = false;
72    limit = 1000;
73    attr = new_attr(jcr);
74    list_entries = result_handler;
75    user_data = this;
76    username = NULL;
77 }
78
79 Bvfs::~Bvfs() {
80    free_pool_memory(jobids);
81    free_pool_memory(pattern);
82    free_pool_memory(prev_dir);
83    if (username) {
84       free(username);
85    }
86    free_attr(attr);
87    jcr->dec_use_count();
88 }
89
90 void Bvfs::filter_jobid()
91 {
92    if (!username) {
93       return;
94    }
95
96    /* Query used by Bweb to filter clients, activated when using
97     * set_username() 
98     */
99    POOL_MEM query;
100    Mmsg(query,
101       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
102         "JOIN (SELECT ClientId FROM client_group_member "
103         "JOIN client_group USING (client_group_id) "
104         "JOIN bweb_client_group_acl USING (client_group_id) "
105         "JOIN bweb_user USING (userid) "
106        "WHERE bweb_user.username = '%s' "
107       ") AS filter USING (ClientId) "
108         " WHERE JobId IN (%s)", 
109         username, jobids);
110
111    db_list_ctx ctx;
112    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
113    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
114    pm_strcpy(jobids, ctx.list);
115 }
116
117 void Bvfs::set_jobid(JobId_t id)
118 {
119    Mmsg(jobids, "%lld", (uint64_t)id);
120    filter_jobid();
121 }
122
123 void Bvfs::set_jobids(char *ids)
124 {
125    pm_strcpy(jobids, ids);
126    filter_jobid();
127 }
128
129 /* 
130  * TODO: Find a way to let the user choose how he wants to display
131  * files and directories
132  */
133
134
135 /* 
136  * Working Object to store PathId already seen (avoid
137  * database queries), equivalent to %cache_ppathid in perl
138  */
139
140 #define NITEMS 50000
141 class pathid_cache {
142 private:
143    hlink *nodes;
144    int nb_node;
145    int max_node;
146
147    alist *table_node;
148
149    htable *cache_ppathid;
150
151 public:
152    pathid_cache() {
153       hlink link;
154       cache_ppathid = (htable *)malloc(sizeof(htable));
155       cache_ppathid->init(&link, &link, NITEMS);
156       max_node = NITEMS;
157       nodes = (hlink *) malloc(max_node * sizeof (hlink));
158       nb_node = 0;
159       table_node = New(alist(5, owned_by_alist));
160       table_node->append(nodes);
161    }
162
163    hlink *get_hlink() {
164       if (++nb_node >= max_node) {
165          nb_node = 0;
166          nodes = (hlink *)malloc(max_node * sizeof(hlink));
167          table_node->append(nodes);
168       }
169       return nodes + nb_node;
170    }
171
172    bool lookup(char *pathid) {
173       bool ret = cache_ppathid->lookup(pathid) != NULL;
174       return ret;
175    }
176    
177    void insert(char *pathid) {
178       hlink *h = get_hlink();
179       cache_ppathid->insert(pathid, h);
180    }
181
182    ~pathid_cache() {
183       cache_ppathid->destroy();
184       free(cache_ppathid);
185       delete table_node;
186    }
187 private:
188    pathid_cache(const pathid_cache &); /* prohibit pass by value */
189    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
190 } ;
191
192 /* Return the parent_dir with the trailing /  (update the given string)
193  * TODO: see in the rest of bacula if we don't have already this function
194  * dir=/tmp/toto/
195  * dir=/tmp/
196  * dir=/
197  * dir=
198  */
199 char *bvfs_parent_dir(char *path)
200 {
201    char *p = path;
202    int len = strlen(path) - 1;
203
204    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
205       path[len] = '\0';
206    }
207
208    if (len > 0) {
209       p += len;
210       while (p > path && !IsPathSeparator(*p)) {
211          p--;
212       }
213       p[1] = '\0';
214    }
215    return path;
216 }
217
218 /* Return the basename of the with the trailing /
219  * TODO: see in the rest of bacula if we don't have
220  * this function already
221  */
222 char *bvfs_basename_dir(char *path)
223 {
224    char *p = path;
225    int len = strlen(path) - 1;
226
227    if (path[len] == '/') {      /* if directory, skip last / */
228       len -= 1;
229    }
230
231    if (len > 0) {
232       p += len;
233       while (p > path && !IsPathSeparator(*p)) {
234          p--;
235       }
236       if (*p == '/') {
237          p++;                  /* skip first / */
238       }
239    } 
240    return p;
241 }
242
243 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
244                                  pathid_cache &ppathid_cache, 
245                                  char *org_pathid, char *path)
246 {
247    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
248    char pathid[50];
249    ATTR_DBR parent;
250    char *bkp = mdb->path;
251    strncpy(pathid, org_pathid, sizeof(pathid));
252
253    /* Does the ppathid exist for this ? we use a memory cache...  In order to
254     * avoid the full loop, we consider that if a dir is allready in the
255     * PathHierarchy table, then there is no need to calculate all the
256     * hierarchy
257     */
258    while (path && *path)
259    {
260       if (!ppathid_cache.lookup(pathid))
261       {
262          Mmsg(mdb->cmd, 
263               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
264               pathid);
265
266          QUERY_DB(jcr, mdb, mdb->cmd);
267          /* Do we have a result ? */
268          if (sql_num_rows(mdb) > 0) {
269             ppathid_cache.insert(pathid);
270             /* This dir was in the db ...
271              * It means we can leave, the tree has allready been built for
272              * this dir
273              */
274             goto bail_out;
275          } else {
276             /* search or create parent PathId in Path table */
277             mdb->path = bvfs_parent_dir(path);
278             mdb->pnl = strlen(mdb->path);
279             if (!db_create_path_record(jcr, mdb, &parent)) {
280                goto bail_out;
281             }
282             ppathid_cache.insert(pathid);
283             
284             Mmsg(mdb->cmd,
285                  "INSERT INTO PathHierarchy (PathId, PPathId) "
286                  "VALUES (%s,%lld)",
287                  pathid, (uint64_t) parent.PathId);
288             
289             INSERT_DB(jcr, mdb, mdb->cmd);
290
291             edit_uint64(parent.PathId, pathid);
292             path = mdb->path;   /* already done */
293          }
294       } else {
295          /* It's already in the cache.  We can leave, no time to waste here,
296           * all the parent dirs have allready been done
297           */
298          goto bail_out;
299       }
300    }   
301
302 bail_out:
303    mdb->path = bkp;
304    mdb->fnl = 0;
305 }
306
307 /* 
308  * Internal function to update path_hierarchy cache with a shared pathid cache
309  */
310 static void update_path_hierarchy_cache(JCR *jcr,
311                                         B_DB *mdb,
312                                         pathid_cache &ppathid_cache,
313                                         JobId_t JobId)
314 {
315    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
316
317    uint32_t num;
318    char jobid[50];
319    edit_uint64(JobId, jobid);
320  
321    db_lock(mdb);
322    db_start_transaction(jcr, mdb);
323
324    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
325    
326    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
327       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
328       goto bail_out;
329    }
330
331    /* Inserting path records for JobId */
332    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
333                    "SELECT DISTINCT PathId, JobId "
334                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
335                            "UNION "
336                            "SELECT PathId, BaseFiles.JobId "
337                              "FROM BaseFiles JOIN File AS F USING (FileId) "
338                             "WHERE BaseFiles.JobId = %s) AS B",
339         jobid, jobid);
340    QUERY_DB(jcr, mdb, mdb->cmd);
341
342    /* Now we have to do the directory recursion stuff to determine missing
343     * visibility We try to avoid recursion, to be as fast as possible We also
344     * only work on not allready hierarchised directories...
345     */
346    Mmsg(mdb->cmd, 
347      "SELECT PathVisibility.PathId, Path "
348        "FROM PathVisibility "
349             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
350             "LEFT JOIN PathHierarchy "
351          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
352       "WHERE PathVisibility.JobId = %s "
353         "AND PathHierarchy.PathId IS NULL "
354       "ORDER BY Path", jobid);
355    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
356    QUERY_DB(jcr, mdb, mdb->cmd);
357
358    /* TODO: I need to reuse the DB connection without emptying the result 
359     * So, now i'm copying the result in memory to be able to query the
360     * catalog descriptor again.
361     */
362    num = sql_num_rows(mdb);
363    if (num > 0) {
364       char **result = (char **)malloc (num * 2 * sizeof(char *));
365       
366       SQL_ROW row;
367       int i=0;
368       while((row = sql_fetch_row(mdb))) {
369          result[i++] = bstrdup(row[0]);
370          result[i++] = bstrdup(row[1]);
371       }
372       
373       i=0;
374       while (num > 0) {
375          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
376          free(result[i++]);
377          free(result[i++]);
378          num--;
379       }
380       free(result);
381    }
382
383    Mmsg(mdb->cmd, 
384   "INSERT INTO PathVisibility (PathId, JobId)  "
385    "SELECT a.PathId,%s "
386    "FROM ( "
387      "SELECT DISTINCT h.PPathId AS PathId "
388        "FROM PathHierarchy AS h "
389        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
390       "WHERE p.JobId=%s) AS a LEFT JOIN "
391        "(SELECT PathId "
392           "FROM PathVisibility "
393          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
394    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
395
396    do {
397       QUERY_DB(jcr, mdb, mdb->cmd);
398    } while (sql_affected_rows(mdb) > 0);
399    
400    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
401    UPDATE_DB(jcr, mdb, mdb->cmd);
402
403 bail_out:
404    db_end_transaction(jcr, mdb);
405    db_unlock(mdb);
406 }
407
408 /* 
409  * Find an store the filename descriptor for empty directories Filename.Name=''
410  */
411 DBId_t Bvfs::get_dir_filenameid()
412 {
413    uint32_t id;
414    if (dir_filenameid) {
415       return dir_filenameid;
416    }
417    POOL_MEM q;
418    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
419    db_sql_query(db, q.c_str(), db_int_handler, &id);
420    dir_filenameid = id;
421    return dir_filenameid;
422 }
423
424 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
425 {
426    uint32_t nb=0;
427    db_list_ctx jobids_list;
428
429    db_lock(mdb);
430    db_start_transaction(jcr, mdb);
431
432 #ifdef xxx
433    /* TODO: Remove this code when updating make_bacula_table script */
434    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
435    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
436       Dmsg0(dbglevel, "Creating cache table\n");
437       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
438       QUERY_DB(jcr, mdb, mdb->cmd);
439
440       Mmsg(mdb->cmd,
441            "CREATE TABLE PathHierarchy ( "
442            "PathId integer NOT NULL, "
443            "PPathId integer NOT NULL, "
444            "CONSTRAINT pathhierarchy_pkey "
445            "PRIMARY KEY (PathId))");
446       QUERY_DB(jcr, mdb, mdb->cmd); 
447
448       Mmsg(mdb->cmd,
449            "CREATE INDEX pathhierarchy_ppathid "
450            "ON PathHierarchy (PPathId)");
451       QUERY_DB(jcr, mdb, mdb->cmd);
452
453       Mmsg(mdb->cmd, 
454            "CREATE TABLE PathVisibility ("
455            "PathId integer NOT NULL, "
456            "JobId integer NOT NULL, "
457            "Size int8 DEFAULT 0, "
458            "Files int4 DEFAULT 0, "
459            "CONSTRAINT pathvisibility_pkey "
460            "PRIMARY KEY (JobId, PathId))");
461       QUERY_DB(jcr, mdb, mdb->cmd);
462
463       Mmsg(mdb->cmd, 
464            "CREATE INDEX pathvisibility_jobid "
465            "ON PathVisibility (JobId)");
466       QUERY_DB(jcr, mdb, mdb->cmd);
467
468    }
469 #endif
470
471    Mmsg(mdb->cmd, 
472  "SELECT JobId from Job "
473   "WHERE HasCache = 0 "
474     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
475   "ORDER BY JobId");
476
477    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
478
479    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
480
481    db_end_transaction(jcr, mdb);
482    db_start_transaction(jcr, mdb);
483    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
484    Mmsg(mdb->cmd, 
485         "DELETE FROM PathVisibility "
486          "WHERE NOT EXISTS "
487         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
488    nb = DELETE_DB(jcr, mdb, mdb->cmd);
489    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
490
491    db_end_transaction(jcr, mdb);
492    db_unlock(mdb);
493 }
494
495 /*
496  * Update the bvfs cache for given jobids (1,2,3,4)
497  */
498 void
499 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
500 {
501    pathid_cache ppathid_cache;
502    JobId_t JobId;
503    char *p;
504
505    for (p=jobids; ; ) {
506       int stat = get_next_jobid_from_list(&p, &JobId);
507       if (stat < 0) {
508          return;
509       }
510       if (stat == 0) {
511          break;
512       }
513       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
514       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
515    }
516 }
517
518 /* 
519  * Update the bvfs cache for current jobids
520  */
521 void Bvfs::update_cache()
522 {
523    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
524 }
525
526 /* Change the current directory, returns true if the path exists */
527 bool Bvfs::ch_dir(const char *path)
528 {
529    pm_strcpy(db->path, path);
530    db->pnl = strlen(db->path);
531    ch_dir(db_get_path_record(jcr, db)); 
532    return pwd_id != 0;
533 }
534
535 /* 
536  * Get all file versions for a specified client
537  * TODO: Handle basejobs using different client
538  */
539 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
540 {
541    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
542          (uint64_t)fnid, client);
543    char ed1[50], ed2[50];
544    POOL_MEM q;
545    if (see_copies) {
546       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
547    } else {
548       Mmsg(q, " AND Job.Type = 'B' ");
549    }
550
551    POOL_MEM query;
552
553    Mmsg(query,//    1           2              3       
554 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
555 //         4          5           6
556         "File.JobId, File.LStat, File.FileId, "
557 //         7                    8
558        "Media.VolumeName, Media.InChanger "
559 "FROM File, Job, Client, JobMedia, Media "
560 "WHERE File.FilenameId = %s "
561   "AND File.PathId=%s "
562   "AND File.JobId = Job.JobId "
563   "AND Job.JobId = JobMedia.JobId "
564   "AND File.FileIndex >= JobMedia.FirstIndex "
565   "AND File.FileIndex <= JobMedia.LastIndex "
566   "AND JobMedia.MediaId = Media.MediaId "
567   "AND Job.ClientId = Client.ClientId "
568   "AND Client.Name = '%s' "
569   "%s ORDER BY FileId LIMIT %d OFFSET %d"
570         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
571         limit, offset);
572    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
573    db_sql_query(db, query.c_str(), list_entries, user_data);
574 }
575
576 DBId_t Bvfs::get_root()
577 {
578    *db->path = 0;
579    return db_get_path_record(jcr, db);
580 }
581
582 static int path_handler(void *ctx, int fields, char **row)
583 {
584    Bvfs *fs = (Bvfs *) ctx;
585    return fs->_handle_path(ctx, fields, row);
586 }
587
588 int Bvfs::_handle_path(void *ctx, int fields, char **row)
589 {
590    if (bvfs_is_dir(row)) {
591       /* can have the same path 2 times */
592       if (strcmp(row[BVFS_Name], prev_dir)) {
593          pm_strcpy(prev_dir, row[BVFS_Name]);
594          return list_entries(user_data, fields, row);
595       }
596    }
597    return 0;
598 }
599
600 /* 
601  * Retrieve . and .. information
602  */
603 void Bvfs::ls_special_dirs()
604 {
605    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
606    char ed1[50], ed2[50];
607    if (*jobids == 0) {
608       return;
609    }
610    if (!dir_filenameid) {
611       get_dir_filenameid();
612    }
613
614    /* Will fetch directories  */
615    *prev_dir = 0;
616
617    POOL_MEM query;
618    Mmsg(query, 
619 "(SELECT PPathId AS PathId, '..' AS Path "
620     "FROM  PathHierarchy "
621    "WHERE  PathId = %s "
622 "UNION "
623  "SELECT %s AS PathId, '.' AS Path)",
624         edit_uint64(pwd_id, ed1), ed1);
625
626    POOL_MEM query2;
627    Mmsg(query2,// 1      2     3        4     5       6
628 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
629   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
630        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
631               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
632        "WHERE File1.FilenameId = %s "
633        "AND File1.JobId IN (%s)) AS listfile1 "
634   "ON (tmp.PathId = listfile1.PathId) "
635   "ORDER BY tmp.Path, JobId DESC ",
636         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
637
638    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
639    db_sql_query(db, query2.c_str(), path_handler, this);
640 }
641
642 /* Returns true if we have dirs to read */
643 bool Bvfs::ls_dirs()
644 {
645    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
646    char ed1[50], ed2[50];
647    if (*jobids == 0) {
648       return false;
649    }
650
651    POOL_MEM query;
652    POOL_MEM filter;
653    if (*pattern) {
654       int len = strlen(pattern);
655       query.check_size(len*2+1);
656       db_escape_string(jcr, db, query.c_str(), pattern, len);
657       Mmsg(filter, " AND Path2.Path %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
658    }
659
660    if (!dir_filenameid) {
661       get_dir_filenameid();
662    }
663
664    /* the sql query displays same directory multiple time, take the first one */
665    *prev_dir = 0;
666
667    /* Let's retrieve the list of the visible dirs in this dir ...
668     * First, I need the empty filenameid to locate efficiently
669     * the dirs in the file table
670     * my $dir_filenameid = $self->get_dir_filenameid();
671     */
672    /* Then we get all the dir entries from File ... */
673    Mmsg(query,
674 //       0     1     2   3      4     5       6
675 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
676     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
677            "lower(Path1.Path) AS lpath, "
678            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
679            "listfile1.FileId AS FileId "
680     "FROM ( "
681       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
682       "FROM PathHierarchy AS PathHierarchy1 "
683       "JOIN Path AS Path2 "
684         "ON (PathHierarchy1.PathId = Path2.PathId) "
685       "JOIN PathVisibility AS PathVisibility1 "
686         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
687       "WHERE PathHierarchy1.PPathId = %s "
688       "AND PathVisibility1.JobId IN (%s) "
689            "%s "
690      ") AS listpath1 "
691    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
692
693    "LEFT JOIN ( " /* get attributes if any */
694        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
695               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
696        "WHERE File1.FilenameId = %s "
697        "AND File1.JobId IN (%s)) AS listfile1 "
698        "ON (listpath1.PathId = listfile1.PathId) "
699     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
700         edit_uint64(pwd_id, ed1),
701         jobids,
702         filter.c_str(),
703         edit_uint64(dir_filenameid, ed2),
704         jobids,
705         limit, offset);
706
707    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
708
709    db_lock(db);
710    db_sql_query(db, query.c_str(), path_handler, this);
711    nb_record = sql_num_rows(db);
712    db_unlock(db);
713
714    return nb_record == limit;
715 }
716
717 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
718                           const char *JobId, const char *PathId,  
719                           const char *filter, int64_t limit, int64_t offset)
720 {
721    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
722       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
723            JobId, PathId, JobId, PathId, 
724            filter, limit, offset);
725    } else {
726       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
727            JobId, PathId, JobId, PathId, 
728            limit, offset, filter, JobId, JobId);
729    }
730 }
731
732 /* Returns true if we have files to read */
733 bool Bvfs::ls_files()
734 {
735    POOL_MEM query;
736    POOL_MEM filter;
737    char pathid[50];
738
739    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
740    if (*jobids == 0) {
741       return false;
742    }
743
744    if (!pwd_id) {
745       ch_dir(get_root());
746    }
747
748    edit_uint64(pwd_id, pathid);
749    if (*pattern) {
750       int len = strlen(pattern);
751       query.check_size(len*2+1);
752       db_escape_string(jcr, db, query.c_str(), pattern, len);
753       Mmsg(filter, " AND Filename.Name %s '%s' ", match_query[db_get_type_index(db)], query.c_str());
754    }
755
756    build_ls_files_query(db, query, 
757                         jobids, pathid, filter.c_str(),
758                         limit, offset);
759
760    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
761
762    db_lock(db);
763    db_sql_query(db, query.c_str(), list_entries, user_data);
764    nb_record = sql_num_rows(db);
765    db_unlock(db);
766
767    return nb_record == limit;
768 }
769
770
771 /* 
772  * Return next Id from comma separated list   
773  *
774  * Returns:
775  *   1 if next Id returned
776  *   0 if no more Ids are in list
777  *  -1 there is an error
778  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
779  */
780 static int get_next_id_from_list(char **p, int64_t *Id)
781 {
782    const int maxlen = 30;
783    char id[maxlen+1];
784    char *q = *p;
785
786    id[0] = 0;
787    for (int i=0; i<maxlen; i++) {
788       if (*q == 0) {
789          break;
790       } else if (*q == ',') {
791          q++;
792          break;
793       }
794       id[i] = *q++;
795       id[i+1] = 0;
796    }
797    if (id[0] == 0) {
798       return 0;
799    } else if (!is_a_number(id)) {
800       return -1;                      /* error */
801    }
802    *p = q;
803    *Id = str_to_int64(id);
804    return 1;
805 }
806
807 static int get_path_handler(void *ctx, int fields, char **row)
808 {
809    POOL_MEM *buf = (POOL_MEM *) ctx;
810    pm_strcpy(*buf, row[0]);
811    return 0;
812 }
813
814 static bool check_temp(char *output_table)
815 {
816    if (output_table[0] == 'b' &&
817        output_table[1] == '2' &&
818        is_an_integer(output_table + 2))
819    {
820       return true;
821    }
822    return false;
823 }
824
825 bool Bvfs::drop_restore_list(char *output_table)
826 {
827    POOL_MEM query;
828    if (check_temp(output_table)) {
829       Mmsg(query, "DROP TABLE %s", output_table);
830       db_sql_query(db, query.c_str(), NULL, NULL);
831       return true;
832    }
833    return false;
834 }
835
836 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
837                                 char *output_table)
838 {
839    POOL_MEM query;
840    POOL_MEM tmp, tmp2;
841    int64_t id, jobid;
842    bool init=false;
843    bool ret=false;
844    /* check args */
845    if ((*fileid   && !is_a_number_list(fileid))  ||
846        (*dirid    && !is_a_number_list(dirid))   ||
847        (*hardlink && !is_a_number_list(hardlink))||
848        (!*hardlink && !*fileid && !*dirid && !*hardlink))
849    {
850       return false;
851    }
852    if (!check_temp(output_table)) {
853       return false;
854    }
855
856    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
857
858    if (*fileid) {               /* Select files with their direct id */
859       init=true;
860       Mmsg(tmp,"SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
861                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
862            fileid);
863       pm_strcat(query, tmp.c_str());
864    }
865
866    /* Add a directory content */
867    while (get_next_id_from_list(&dirid, &id) == 1) {
868       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
869       
870       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
871          Dmsg0(dbglevel, "Can't search for path\n");
872          /* print error */
873          return false;
874       }
875       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
876          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
877                id, tmp.c_str(), tmp2.c_str());
878          break;
879       }
880       /* escape % and _ for LIKE search */
881       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
882       char *p = tmp.c_str();
883       for (char *s = tmp2.c_str(); *s ; s++) {
884          if (*s == '%' || *s == '_' || *s == '\\') {
885             *p = '\\'; 
886             p++;
887          }
888          *p = *s; 
889          p++;
890       }
891       *p = '\0';
892       tmp.strcat("%");
893
894       size_t len = strlen(tmp.c_str());
895       tmp2.check_size((len+1) * 2);
896       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
897
898       if (init) {
899          query.strcat(" UNION ");
900       }
901
902       Mmsg(tmp, "SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
903                         "File.PathId, FileId "
904                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
905                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
906            tmp2.c_str(), jobids); 
907       query.strcat(tmp.c_str());
908       init = true;
909
910       query.strcat(" UNION ");
911
912       /* A directory can have files from a BaseJob */
913       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
914                         "File.FilenameId, File.PathId, BaseFiles.FileId "
915                    "FROM BaseFiles "
916                         "JOIN File USING (FileId) "
917                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
918                         "JOIN Path USING (PathId) "
919                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
920            tmp2.c_str(), jobids); 
921       query.strcat(tmp.c_str());
922    }
923
924    /* expect jobid,fileindex */
925    int64_t prev_jobid=0;
926    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
927       if (get_next_id_from_list(&hardlink, &id) != 1) {
928          Dmsg0(dbglevel, "hardlink should be two by two\n");
929          return false;
930       }
931       if (jobid != prev_jobid) { /* new job */
932          if (prev_jobid == 0) {  /* first jobid */
933             if (init) {
934                query.strcat(" UNION ");
935             }
936          } else {               /* end last job, start new one */
937             tmp.strcat(") UNION ");
938             query.strcat(tmp.c_str());
939          }
940          Mmsg(tmp,   "SELECT JobId, JobTDate, FileIndex, FilenameId, "
941                             "PathId, FileId "
942                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
943                         "AND FileIndex IN (%lld", jobid, id);
944          prev_jobid = jobid;
945
946       } else {                  /* same job, add new findex */
947          Mmsg(tmp2, ", %lld", id);
948          tmp.strcat(tmp2.c_str());
949       }
950    }
951
952    if (prev_jobid != 0) {       /* end last job */
953       tmp.strcat(") ");
954       query.strcat(tmp.c_str());
955       init = true;
956    }
957
958    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
959
960    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
961       Dmsg0(dbglevel, "Can't execute q\n");
962       goto bail_out;
963    }
964
965    /* TODO: handle basejob and SQLite3 */
966    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], output_table, output_table);
967
968    /* TODO: handle jobid filter */
969    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
970    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
971       Dmsg0(dbglevel, "Can't execute q\n");
972       goto bail_out;
973    }
974
975    /* MySQL need it */
976    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
977       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
978            output_table, output_table);
979    }
980
981    ret = true;
982
983 bail_out:
984    Mmsg(query, "DROP TABLE btemp%s", output_table);
985    db_sql_query(db, query.c_str(), NULL, NULL);
986    return ret;
987 }
988
989 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */