]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
Tweak basejobs + bvfs
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98
99    alist *table_node;
100
101    htable *cache_ppathid;
102
103 public:
104    pathid_cache() {
105       hlink link;
106       cache_ppathid = (htable *)malloc(sizeof(htable));
107       cache_ppathid->init(&link, &link, NITEMS);
108       max_node = NITEMS;
109       nodes = (hlink *) malloc(max_node * sizeof (hlink));
110       nb_node = 0;
111       table_node = New(alist(5, owned_by_alist));
112       table_node->append(nodes);
113    }
114
115    hlink *get_hlink() {
116       if (++nb_node >= max_node) {
117          nb_node = 0;
118          nodes = (hlink *)malloc(max_node * sizeof(hlink));
119          table_node->append(nodes);
120       }
121       return nodes + nb_node;
122    }
123
124    bool lookup(char *pathid) {
125       bool ret = cache_ppathid->lookup(pathid) != NULL;
126       return ret;
127    }
128    
129    void insert(char *pathid) {
130       hlink *h = get_hlink();
131       cache_ppathid->insert(pathid, h);
132    }
133
134    ~pathid_cache() {
135       cache_ppathid->destroy();
136       free(cache_ppathid);
137       delete table_node;
138    }
139 private:
140    pathid_cache(const pathid_cache &); /* prohibit pass by value */
141    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
142 } ;
143
144 /* Return the parent_dir with the trailing /  (update the given string)
145  * TODO: see in the rest of bacula if we don't have already this function
146  * dir=/tmp/toto/
147  * dir=/tmp/
148  * dir=/
149  * dir=
150  */
151 char *bvfs_parent_dir(char *path)
152 {
153    char *p = path;
154    int len = strlen(path) - 1;
155
156    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
157       path[len] = '\0';
158    }
159
160    if (len > 0) {
161       p += len;
162       while (p > path && !IsPathSeparator(*p)) {
163          p--;
164       }
165       p[1] = '\0';
166    }
167    return path;
168 }
169
170 /* Return the basename of the with the trailing /
171  * TODO: see in the rest of bacula if we don't have
172  * this function already
173  */
174 char *bvfs_basename_dir(char *path)
175 {
176    char *p = path;
177    int len = strlen(path) - 1;
178
179    if (path[len] == '/') {      /* if directory, skip last / */
180       len -= 1;
181    }
182
183    if (len > 0) {
184       p += len;
185       while (p > path && !IsPathSeparator(*p)) {
186          p--;
187       }
188       if (*p == '/') {
189          p++;                  /* skip first / */
190       }
191    } 
192    return p;
193 }
194
195 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
196                                  pathid_cache &ppathid_cache, 
197                                  char *org_pathid, char *path)
198 {
199    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
200    char pathid[50];
201    ATTR_DBR parent;
202    char *bkp = mdb->path;
203    strncpy(pathid, org_pathid, sizeof(pathid));
204
205    /* Does the ppathid exist for this ? we use a memory cache...  In order to
206     * avoid the full loop, we consider that if a dir is allready in the
207     * PathHierarchy table, then there is no need to calculate all the
208     * hierarchy
209     */
210    while (path && *path)
211    {
212       if (!ppathid_cache.lookup(pathid))
213       {
214          Mmsg(mdb->cmd, 
215               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
216               pathid);
217
218          QUERY_DB(jcr, mdb, mdb->cmd);
219          /* Do we have a result ? */
220          if (sql_num_rows(mdb) > 0) {
221             ppathid_cache.insert(pathid);
222             /* This dir was in the db ...
223              * It means we can leave, the tree has allready been built for
224              * this dir
225              */
226             goto bail_out;
227          } else {
228             /* search or create parent PathId in Path table */
229             mdb->path = bvfs_parent_dir(path);
230             mdb->pnl = strlen(mdb->path);
231             if (!db_create_path_record(jcr, mdb, &parent)) {
232                goto bail_out;
233             }
234             ppathid_cache.insert(pathid);
235             
236             Mmsg(mdb->cmd,
237                  "INSERT INTO PathHierarchy (PathId, PPathId) "
238                  "VALUES (%s,%lld)",
239                  pathid, (uint64_t) parent.PathId);
240             
241             INSERT_DB(jcr, mdb, mdb->cmd);
242
243             edit_uint64(parent.PathId, pathid);
244             path = mdb->path;   /* already done */
245          }
246       } else {
247          /* It's already in the cache.  We can leave, no time to waste here,
248           * all the parent dirs have allready been done
249           */
250          goto bail_out;
251       }
252    }   
253
254 bail_out:
255    mdb->path = bkp;
256    mdb->fnl = 0;
257 }
258
259 /* 
260  * Internal function to update path_hierarchy cache with a shared pathid cache
261  */
262 static void update_path_hierarchy_cache(JCR *jcr,
263                                         B_DB *mdb,
264                                         pathid_cache &ppathid_cache,
265                                         JobId_t JobId)
266 {
267    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
268
269    uint32_t num;
270    char jobid[50];
271    edit_uint64(JobId, jobid);
272  
273    db_lock(mdb);
274    db_start_transaction(jcr, mdb);
275
276    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
277    
278    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
279       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
280       goto bail_out;
281    }
282
283    /* Inserting path records for JobId */
284    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
285                    "SELECT DISTINCT PathId, JobId "
286                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
287                            "UNION "
288                            "SELECT PathId, BaseFiles.JobId FROM BaseFiles JOIN File AS F USING (FileId) "
289                             "WHERE BaseFiles.JobId = %s) AS B",
290         jobid, jobid);
291    QUERY_DB(jcr, mdb, mdb->cmd);
292
293    /* Now we have to do the directory recursion stuff to determine missing
294     * visibility We try to avoid recursion, to be as fast as possible We also
295     * only work on not allready hierarchised directories...
296     */
297    Mmsg(mdb->cmd, 
298      "SELECT PathVisibility.PathId, Path "
299        "FROM PathVisibility "
300             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
301             "LEFT JOIN PathHierarchy "
302          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
303       "WHERE PathVisibility.JobId = %s "
304         "AND PathHierarchy.PathId IS NULL "
305       "ORDER BY Path", jobid);
306    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
307    QUERY_DB(jcr, mdb, mdb->cmd);
308
309    /* TODO: I need to reuse the DB connection without emptying the result 
310     * So, now i'm copying the result in memory to be able to query the
311     * catalog descriptor again.
312     */
313    num = sql_num_rows(mdb);
314    if (num > 0) {
315       char **result = (char **)malloc (num * 2 * sizeof(char *));
316       
317       SQL_ROW row;
318       int i=0;
319       while((row = sql_fetch_row(mdb))) {
320          result[i++] = bstrdup(row[0]);
321          result[i++] = bstrdup(row[1]);
322       }
323       
324       i=0;
325       while (num > 0) {
326          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
327          free(result[i++]);
328          free(result[i++]);
329          num--;
330       }
331       free(result);
332    }
333
334    Mmsg(mdb->cmd, 
335   "INSERT INTO PathVisibility (PathId, JobId)  "
336    "SELECT a.PathId,%s "
337    "FROM ( "
338      "SELECT DISTINCT h.PPathId AS PathId "
339        "FROM PathHierarchy AS h "
340        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
341       "WHERE p.JobId=%s) AS a LEFT JOIN "
342        "(SELECT PathId "
343           "FROM PathVisibility "
344          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
345    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
346
347    do {
348       QUERY_DB(jcr, mdb, mdb->cmd);
349    } while (sql_affected_rows(mdb) > 0);
350    
351    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
352    UPDATE_DB(jcr, mdb, mdb->cmd);
353
354 bail_out:
355    db_end_transaction(jcr, mdb);
356    db_unlock(mdb);
357 }
358
359 /* 
360  * Find an store the filename descriptor for empty directories Filename.Name=''
361  */
362 DBId_t Bvfs::get_dir_filenameid()
363 {
364    uint32_t id;
365    if (dir_filenameid) {
366       return dir_filenameid;
367    }
368    POOL_MEM q;
369    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
370    db_sql_query(db, q.c_str(), db_int_handler, &id);
371    dir_filenameid = id;
372    return dir_filenameid;
373 }
374
375 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
376 {
377    uint32_t nb=0;
378    db_list_ctx jobids_list;
379
380    db_lock(mdb);
381    db_start_transaction(jcr, mdb);
382
383 #ifdef xxx
384    /* TODO: Remove this code when updating make_bacula_table script */
385    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
386    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
387       Dmsg0(dbglevel, "Creating cache table\n");
388       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
389       QUERY_DB(jcr, mdb, mdb->cmd);
390
391       Mmsg(mdb->cmd,
392            "CREATE TABLE PathHierarchy ( "
393            "PathId integer NOT NULL, "
394            "PPathId integer NOT NULL, "
395            "CONSTRAINT pathhierarchy_pkey "
396            "PRIMARY KEY (PathId))");
397       QUERY_DB(jcr, mdb, mdb->cmd); 
398
399       Mmsg(mdb->cmd,
400            "CREATE INDEX pathhierarchy_ppathid "
401            "ON PathHierarchy (PPathId)");
402       QUERY_DB(jcr, mdb, mdb->cmd);
403
404       Mmsg(mdb->cmd, 
405            "CREATE TABLE PathVisibility ("
406            "PathId integer NOT NULL, "
407            "JobId integer NOT NULL, "
408            "Size int8 DEFAULT 0, "
409            "Files int4 DEFAULT 0, "
410            "CONSTRAINT pathvisibility_pkey "
411            "PRIMARY KEY (JobId, PathId))");
412       QUERY_DB(jcr, mdb, mdb->cmd);
413
414       Mmsg(mdb->cmd, 
415            "CREATE INDEX pathvisibility_jobid "
416            "ON PathVisibility (JobId)");
417       QUERY_DB(jcr, mdb, mdb->cmd);
418
419    }
420 #endif
421
422    Mmsg(mdb->cmd, 
423  "SELECT JobId from Job "
424   "WHERE HasCache = 0 "
425     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
426   "ORDER BY JobId");
427
428    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
429
430    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
431
432    db_end_transaction(jcr, mdb);
433    db_start_transaction(jcr, mdb);
434    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
435    Mmsg(mdb->cmd, 
436         "DELETE FROM PathVisibility "
437          "WHERE NOT EXISTS "
438         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
439    nb = DELETE_DB(jcr, mdb, mdb->cmd);
440    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
441
442    db_end_transaction(jcr, mdb);
443    db_unlock(mdb);
444 }
445
446 /*
447  * Update the bvfs cache for given jobids (1,2,3,4)
448  */
449 void
450 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
451 {
452    pathid_cache ppathid_cache;
453    JobId_t JobId;
454    char *p;
455
456    for (p=jobids; ; ) {
457       int stat = get_next_jobid_from_list(&p, &JobId);
458       if (stat < 0) {
459          return;
460       }
461       if (stat == 0) {
462          break;
463       }
464       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
465       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
466    }
467 }
468
469 /* 
470  * Update the bvfs cache for current jobids
471  */
472 void Bvfs::update_cache()
473 {
474    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
475 }
476
477 /* Change the current directory, returns true if the path exists */
478 bool Bvfs::ch_dir(const char *path)
479 {
480    pm_strcpy(db->path, path);
481    db->pnl = strlen(db->path);
482    ch_dir(db_get_path_record(jcr, db)); 
483    return pwd_id != 0;
484 }
485
486 /* 
487  * Get all file versions for a specified client
488  * TODO: Handle basejobs using different client
489  */
490 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
491 {
492    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
493          (uint64_t)fnid, client);
494    char ed1[50], ed2[50];
495    POOL_MEM q;
496    if (see_copies) {
497       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
498    } else {
499       Mmsg(q, " AND Job.Type = 'B' ");
500    }
501
502    POOL_MEM query;
503
504    Mmsg(query,//    1           2              3       
505 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
506 //         4          5           6
507         "File.JobId, File.LStat, File.FileId, "
508 //         7                    8
509        "Media.VolumeName, Media.InChanger "
510 "FROM File, Job, Client, JobMedia, Media "
511 "WHERE File.FilenameId = %s "
512   "AND File.PathId=%s "
513   "AND File.JobId = Job.JobId "
514   "AND Job.JobId = JobMedia.JobId "
515   "AND File.FileIndex >= JobMedia.FirstIndex "
516   "AND File.FileIndex <= JobMedia.LastIndex "
517   "AND JobMedia.MediaId = Media.MediaId "
518   "AND Job.ClientId = Client.ClientId "
519   "AND Client.Name = '%s' "
520   "%s ORDER BY FileId LIMIT %d OFFSET %d"
521         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
522         limit, offset);
523    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
524    db_sql_query(db, query.c_str(), list_entries, user_data);
525 }
526
527 DBId_t Bvfs::get_root()
528 {
529    *db->path = 0;
530    return db_get_path_record(jcr, db);
531 }
532
533 static int path_handler(void *ctx, int fields, char **row)
534 {
535    Bvfs *fs = (Bvfs *) ctx;
536    return fs->_handle_path(ctx, fields, row);
537 }
538
539 int Bvfs::_handle_path(void *ctx, int fields, char **row)
540 {
541    if (bvfs_is_dir(row)) {
542       /* can have the same path 2 times */
543       if (strcmp(row[BVFS_Name], prev_dir)) {
544          pm_strcpy(prev_dir, row[BVFS_Name]);
545          return list_entries(user_data, fields, row);
546       }
547    }
548    return 0;
549 }
550
551 /* 
552  * Retrieve . and .. information
553  */
554 void Bvfs::ls_special_dirs()
555 {
556    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
557    char ed1[50], ed2[50];
558    if (*jobids == 0) {
559       return;
560    }
561    if (!dir_filenameid) {
562       get_dir_filenameid();
563    }
564
565    /* Will fetch directories  */
566    *prev_dir = 0;
567
568    POOL_MEM query;
569    Mmsg(query, 
570 "((SELECT PPathId AS PathId, '..' AS Path "
571     "FROM  PathHierarchy "
572    "WHERE  PathId = %s) "
573 "UNION "
574  "(SELECT %s AS PathId, '.' AS Path))",
575         edit_uint64(pwd_id, ed1), ed1);
576
577    POOL_MEM query2;
578    Mmsg(query2,// 1      2     3        4     5       6
579 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
580   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
581        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
582               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
583        "WHERE File1.FilenameId = %s "
584        "AND File1.JobId IN (%s)) AS listfile1 "
585   "ON (tmp.PathId = listfile1.PathId) "
586   "ORDER BY tmp.Path, JobId DESC ",
587         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
588
589    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
590    db_sql_query(db, query2.c_str(), path_handler, this);
591 }
592
593 /* Returns true if we have dirs to read */
594 bool Bvfs::ls_dirs()
595 {
596    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
597    char ed1[50], ed2[50];
598    if (*jobids == 0) {
599       return false;
600    }
601
602    POOL_MEM filter;
603    if (*pattern) {
604       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
605    }
606
607    if (!dir_filenameid) {
608       get_dir_filenameid();
609    }
610
611    /* the sql query displays same directory multiple time, take the first one */
612    *prev_dir = 0;
613
614    /* Let's retrieve the list of the visible dirs in this dir ...
615     * First, I need the empty filenameid to locate efficiently
616     * the dirs in the file table
617     * my $dir_filenameid = $self->get_dir_filenameid();
618     */
619    /* Then we get all the dir entries from File ... */
620    POOL_MEM query;
621    Mmsg(query,
622 //       0     1     2   3      4     5       6
623 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
624     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
625            "lower(Path1.Path) AS lpath, "
626            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
627            "listfile1.FileId AS FileId "
628     "FROM ( "
629       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
630       "FROM PathHierarchy AS PathHierarchy1 "
631       "JOIN Path AS Path2 "
632         "ON (PathHierarchy1.PathId = Path2.PathId) "
633       "JOIN PathVisibility AS PathVisibility1 "
634         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
635       "WHERE PathHierarchy1.PPathId = %s "
636       "AND PathVisibility1.jobid IN (%s) "
637            "%s "
638      ") AS listpath1 "
639    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
640
641    "LEFT JOIN ( " /* get attributes if any */
642        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
643               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
644        "WHERE File1.FilenameId = %s "
645        "AND File1.JobId IN (%s)) AS listfile1 "
646        "ON (listpath1.PathId = listfile1.PathId) "
647     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
648         edit_uint64(pwd_id, ed1),
649         jobids,
650         filter.c_str(),
651         edit_uint64(dir_filenameid, ed2),
652         jobids,
653         limit, offset);
654
655    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
656
657    db_lock(db);
658    db_sql_query(db, query.c_str(), path_handler, this);
659    nb_record = db->num_rows;
660    db_unlock(db);
661
662    return nb_record == limit;
663 }
664
665 /* Returns true if we have files to read */
666 bool Bvfs::ls_files()
667 {
668    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
669    char ed1[50];
670    if (*jobids == 0) {
671       return false;
672    }
673
674    if (!pwd_id) {
675       ch_dir(get_root());
676    }
677
678    POOL_MEM filter;
679    if (*pattern) {
680       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
681    }
682    /* TODO: Use JobTDate instead of FileId to determine the latest version */
683    POOL_MEM query;
684    Mmsg(query, //    1              2             3          4
685 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
686         "File.LStat, listfiles.id "
687 "FROM File, ( "
688        "SELECT Filename.Name as Name, max(File.FileId) as id "
689          "FROM File, Filename "
690         "WHERE File.FilenameId = Filename.FilenameId "
691           "AND Filename.Name != '' "
692           "AND File.PathId = %s "
693           "AND File.JobId IN (%s) "
694           "%s "
695         "GROUP BY Filename.Name "
696         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
697      ") AS listfiles "
698 "WHERE File.FileId = listfiles.id",
699         edit_uint64(pwd_id, ed1),
700         jobids,
701         filter.c_str(),
702         limit,
703         offset);
704    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
705
706    db_lock(db);
707    db_sql_query(db, query.c_str(), list_entries, user_data);
708    nb_record = db->num_rows;
709    db_unlock(db);
710
711    return nb_record == limit;
712 }
713
714
715 /* 
716  * Return next Id from comma separated list   
717  *
718  * Returns:
719  *   1 if next Id returned
720  *   0 if no more Ids are in list
721  *  -1 there is an error
722  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
723  */
724 static int get_next_id_from_list(char **p, int64_t *Id)
725 {
726    const int maxlen = 30;
727    char id[maxlen+1];
728    char *q = *p;
729
730    id[0] = 0;
731    for (int i=0; i<maxlen; i++) {
732       if (*q == 0) {
733          break;
734       } else if (*q == ',') {
735          q++;
736          break;
737       }
738       id[i] = *q++;
739       id[i+1] = 0;
740    }
741    if (id[0] == 0) {
742       return 0;
743    } else if (!is_a_number(id)) {
744       return -1;                      /* error */
745    }
746    *p = q;
747    *Id = str_to_int64(id);
748    return 1;
749 }
750
751 static int get_path_handler(void *ctx, int fields, char **row)
752 {
753    POOL_MEM *buf = (POOL_MEM *) ctx;
754    pm_strcpy(*buf, row[0]);
755    return 0;
756 }
757
758 static bool check_temp(char *output_table)
759 {
760    if (output_table[0] == 'b' &&
761        output_table[1] == '2' &&
762        is_an_integer(output_table + 2))
763    {
764       return true;
765    }
766    return false;
767 }
768
769 bool Bvfs::drop_restore_list(char *output_table)
770 {
771    POOL_MEM query;
772    if (check_temp(output_table)) {
773       Mmsg(query, "DROP TABLE %s", output_table);
774       db_sql_query(db, query.c_str(), NULL, NULL);
775       return true;
776    }
777    return false;
778 }
779
780 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
781                                 char *output_table)
782 {
783    POOL_MEM query;
784    POOL_MEM tmp, tmp2;
785    int64_t id, jobid;
786    bool init=false;
787    bool ret=false;
788    /* check args */
789    if ((*fileid   && !is_a_number_list(fileid))  ||
790        (*dirid    && !is_a_number_list(dirid))   ||
791        (*hardlink && !is_a_number_list(hardlink))||
792        (!*hardlink && !*fileid && !*dirid && !*hardlink))
793    {
794       return false;
795    }
796    if (!check_temp(output_table)) {
797       return false;
798    }
799
800    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
801
802    if (*fileid) {
803       init=true;
804       Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
805                    "FROM File WHERE FileId IN (%s))", fileid);
806       pm_strcat(query, tmp.c_str());
807    }
808
809    while (get_next_id_from_list(&dirid, &id) == 1) {
810       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
811       
812       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
813          /* print error */
814          return false;
815       }
816       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
817          Dmsg3(0, "Path not found %lld q=%s s=%s\n",
818                id, tmp.c_str(), tmp2.c_str());
819          break;
820       }
821       /* escape % and _ for LIKE search */
822       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
823       char *p = tmp.c_str();
824       for (char *s = tmp2.c_str(); *s ; s++) {
825          if (*s == '%' || *s == '_' || *s == '\\') {
826             *p = '\\'; 
827             p++;
828          }
829          *p = *s; 
830          p++;
831       }
832       *p = '\0';
833       tmp.strcat("%");
834
835       size_t len = strlen(tmp.c_str());
836       tmp2.check_size((len+1) * 2);
837       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
838
839       if (init) {
840          query.strcat(" UNION ");
841       }
842       Mmsg(tmp, "(SELECT File.JobId, File.FileIndex, File.FilenameId, "
843                         "File.PathId, FileId "
844                    "FROM Path JOIN File USING (PathId) "
845                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
846            tmp2.c_str(), jobids); 
847       query.strcat(tmp.c_str());
848       init = true;
849    }
850
851    /* expect jobid,fileindex */
852    int64_t prev_jobid=0;
853    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
854       if (get_next_id_from_list(&hardlink, &id) != 1) {
855          return false;
856       }
857       if (jobid != prev_jobid) { /* new job */
858          if (prev_jobid == 0) {  /* first jobid */
859             if (init) {
860                query.strcat(" UNION ");
861             }
862          } else {               /* end last job, start new one */
863             tmp.strcat(")) UNION ");
864             query.strcat(tmp.c_str());
865          }
866          Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
867                        "FROM File WHERE JobId = %lld " 
868                         "AND FileIndex IN (%lld", jobid, id);
869          prev_jobid = jobid;
870
871       } else {                  /* same job, add new findex */
872          Mmsg(tmp2, ", %lld", id);
873          tmp.strcat(tmp2.c_str());
874       }
875    }
876
877    if (prev_jobid != 0) {       /* end last job */
878       tmp.strcat(")) ");
879       query.strcat(tmp.c_str());
880       init = true;
881    }
882
883    Dmsg1(0, "q=%s\n", query.c_str());
884
885    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
886       goto bail_out;
887    }
888
889    Mmsg(query, "CREATE TABLE %s AS ( "
890         "SELECT JobId, FileIndex, FileId "
891           "FROM ( "
892      "SELECT DISTINCT ON (PathId, FilenameId) JobId, FileIndex, FileId "
893        "FROM btemp%s "
894       "ORDER BY PathId, FilenameId, JobId DESC "
895           ") AS T "
896           "WHERE FileIndex > 0)", output_table, output_table);
897
898    Dmsg1(0, "q=%s\n", query.c_str());
899    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
900       goto bail_out;
901    }
902    ret = true;
903
904 bail_out:
905    Mmsg(query, "DROP TABLE btemp%s", output_table);
906    db_sql_query(db, query.c_str(), NULL, NULL);
907    return ret;
908 }