]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
tweak queries for bvfs
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats/cats.h"
33 #include "lib/htable.h"
34 #include "bvfs.h"
35
36 #define dbglevel 10
37 #define dbglevel_sql 15
38
39 static int result_handler(void *ctx, int fields, char **row)
40 {
41    if (fields == 4) {
42       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
43             row[0], row[1], row[2], row[3]);
44    } else if (fields == 5) {
45       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3], row[4]);
47    } else if (fields == 6) {
48       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4], row[5]);
50    } else if (fields == 7) {
51       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
53    }
54    return 0;
55 }
56
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
58    jcr = j;
59    jcr->inc_use_count();
60    db = mdb;                 /* need to inc ref count */
61    jobids = get_pool_memory(PM_NAME);
62    prev_dir = get_pool_memory(PM_NAME);
63    pattern = get_pool_memory(PM_NAME);
64    *jobids = *prev_dir = *pattern = 0;
65    dir_filenameid = pwd_id = offset = 0;
66    see_copies = see_all_versions = false;
67    limit = 1000;
68    attr = new_attr(jcr);
69    list_entries = result_handler;
70    user_data = this;
71 }
72
73 Bvfs::~Bvfs() {
74    free_pool_memory(jobids);
75    free_pool_memory(pattern);
76    free_pool_memory(prev_dir);
77    free_attr(attr);
78    jcr->dec_use_count();
79 }
80
81 /* 
82  * TODO: Find a way to let the user choose how he wants to display
83  * files and directories
84  */
85
86
87 /* 
88  * Working Object to store PathId already seen (avoid
89  * database queries), equivalent to %cache_ppathid in perl
90  */
91
92 #define NITEMS 50000
93 class pathid_cache {
94 private:
95    hlink *nodes;
96    int nb_node;
97    int max_node;
98
99    alist *table_node;
100
101    htable *cache_ppathid;
102
103 public:
104    pathid_cache() {
105       hlink link;
106       cache_ppathid = (htable *)malloc(sizeof(htable));
107       cache_ppathid->init(&link, &link, NITEMS);
108       max_node = NITEMS;
109       nodes = (hlink *) malloc(max_node * sizeof (hlink));
110       nb_node = 0;
111       table_node = New(alist(5, owned_by_alist));
112       table_node->append(nodes);
113    }
114
115    hlink *get_hlink() {
116       if (++nb_node >= max_node) {
117          nb_node = 0;
118          nodes = (hlink *)malloc(max_node * sizeof(hlink));
119          table_node->append(nodes);
120       }
121       return nodes + nb_node;
122    }
123
124    bool lookup(char *pathid) {
125       bool ret = cache_ppathid->lookup(pathid) != NULL;
126       return ret;
127    }
128    
129    void insert(char *pathid) {
130       hlink *h = get_hlink();
131       cache_ppathid->insert(pathid, h);
132    }
133
134    ~pathid_cache() {
135       cache_ppathid->destroy();
136       free(cache_ppathid);
137       delete table_node;
138    }
139 private:
140    pathid_cache(const pathid_cache &); /* prohibit pass by value */
141    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
142 } ;
143
144 /* Return the parent_dir with the trailing /  (update the given string)
145  * TODO: see in the rest of bacula if we don't have already this function
146  * dir=/tmp/toto/
147  * dir=/tmp/
148  * dir=/
149  * dir=
150  */
151 char *bvfs_parent_dir(char *path)
152 {
153    char *p = path;
154    int len = strlen(path) - 1;
155
156    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
157       path[len] = '\0';
158    }
159
160    if (len > 0) {
161       p += len;
162       while (p > path && !IsPathSeparator(*p)) {
163          p--;
164       }
165       p[1] = '\0';
166    }
167    return path;
168 }
169
170 /* Return the basename of the with the trailing /
171  * TODO: see in the rest of bacula if we don't have
172  * this function already
173  */
174 char *bvfs_basename_dir(char *path)
175 {
176    char *p = path;
177    int len = strlen(path) - 1;
178
179    if (path[len] == '/') {      /* if directory, skip last / */
180       len -= 1;
181    }
182
183    if (len > 0) {
184       p += len;
185       while (p > path && !IsPathSeparator(*p)) {
186          p--;
187       }
188       if (*p == '/') {
189          p++;                  /* skip first / */
190       }
191    } 
192    return p;
193 }
194
195 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
196                                  pathid_cache &ppathid_cache, 
197                                  char *org_pathid, char *path)
198 {
199    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
200    char pathid[50];
201    ATTR_DBR parent;
202    char *bkp = mdb->path;
203    strncpy(pathid, org_pathid, sizeof(pathid));
204
205    /* Does the ppathid exist for this ? we use a memory cache...  In order to
206     * avoid the full loop, we consider that if a dir is allready in the
207     * PathHierarchy table, then there is no need to calculate all the
208     * hierarchy
209     */
210    while (path && *path)
211    {
212       if (!ppathid_cache.lookup(pathid))
213       {
214          Mmsg(mdb->cmd, 
215               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
216               pathid);
217
218          QUERY_DB(jcr, mdb, mdb->cmd);
219          /* Do we have a result ? */
220          if (sql_num_rows(mdb) > 0) {
221             ppathid_cache.insert(pathid);
222             /* This dir was in the db ...
223              * It means we can leave, the tree has allready been built for
224              * this dir
225              */
226             goto bail_out;
227          } else {
228             /* search or create parent PathId in Path table */
229             mdb->path = bvfs_parent_dir(path);
230             mdb->pnl = strlen(mdb->path);
231             if (!db_create_path_record(jcr, mdb, &parent)) {
232                goto bail_out;
233             }
234             ppathid_cache.insert(pathid);
235             
236             Mmsg(mdb->cmd,
237                  "INSERT INTO PathHierarchy (PathId, PPathId) "
238                  "VALUES (%s,%lld)",
239                  pathid, (uint64_t) parent.PathId);
240             
241             INSERT_DB(jcr, mdb, mdb->cmd);
242
243             edit_uint64(parent.PathId, pathid);
244             path = mdb->path;   /* already done */
245          }
246       } else {
247          /* It's already in the cache.  We can leave, no time to waste here,
248           * all the parent dirs have allready been done
249           */
250          goto bail_out;
251       }
252    }   
253
254 bail_out:
255    mdb->path = bkp;
256    mdb->fnl = 0;
257 }
258
259 /* 
260  * Internal function to update path_hierarchy cache with a shared pathid cache
261  */
262 static void update_path_hierarchy_cache(JCR *jcr,
263                                         B_DB *mdb,
264                                         pathid_cache &ppathid_cache,
265                                         JobId_t JobId)
266 {
267    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
268
269    uint32_t num;
270    char jobid[50];
271    edit_uint64(JobId, jobid);
272  
273    db_lock(mdb);
274    db_start_transaction(jcr, mdb);
275
276    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
277    
278    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
279       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
280       goto bail_out;
281    }
282
283    /* Inserting path records for JobId */
284    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
285                    "SELECT DISTINCT PathId, JobId "
286                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
287                            "UNION "
288                            "SELECT PathId, BaseFiles.JobId "
289                              "FROM BaseFiles JOIN File AS F USING (FileId) "
290                             "WHERE BaseFiles.JobId = %s) AS B",
291         jobid, jobid);
292    QUERY_DB(jcr, mdb, mdb->cmd);
293
294    /* Now we have to do the directory recursion stuff to determine missing
295     * visibility We try to avoid recursion, to be as fast as possible We also
296     * only work on not allready hierarchised directories...
297     */
298    Mmsg(mdb->cmd, 
299      "SELECT PathVisibility.PathId, Path "
300        "FROM PathVisibility "
301             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
302             "LEFT JOIN PathHierarchy "
303          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
304       "WHERE PathVisibility.JobId = %s "
305         "AND PathHierarchy.PathId IS NULL "
306       "ORDER BY Path", jobid);
307    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
308    QUERY_DB(jcr, mdb, mdb->cmd);
309
310    /* TODO: I need to reuse the DB connection without emptying the result 
311     * So, now i'm copying the result in memory to be able to query the
312     * catalog descriptor again.
313     */
314    num = sql_num_rows(mdb);
315    if (num > 0) {
316       char **result = (char **)malloc (num * 2 * sizeof(char *));
317       
318       SQL_ROW row;
319       int i=0;
320       while((row = sql_fetch_row(mdb))) {
321          result[i++] = bstrdup(row[0]);
322          result[i++] = bstrdup(row[1]);
323       }
324       
325       i=0;
326       while (num > 0) {
327          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
328          free(result[i++]);
329          free(result[i++]);
330          num--;
331       }
332       free(result);
333    }
334
335    Mmsg(mdb->cmd, 
336   "INSERT INTO PathVisibility (PathId, JobId)  "
337    "SELECT a.PathId,%s "
338    "FROM ( "
339      "SELECT DISTINCT h.PPathId AS PathId "
340        "FROM PathHierarchy AS h "
341        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
342       "WHERE p.JobId=%s) AS a LEFT JOIN "
343        "(SELECT PathId "
344           "FROM PathVisibility "
345          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
346    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
347
348    do {
349       QUERY_DB(jcr, mdb, mdb->cmd);
350    } while (sql_affected_rows(mdb) > 0);
351    
352    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
353    UPDATE_DB(jcr, mdb, mdb->cmd);
354
355 bail_out:
356    db_end_transaction(jcr, mdb);
357    db_unlock(mdb);
358 }
359
360 /* 
361  * Find an store the filename descriptor for empty directories Filename.Name=''
362  */
363 DBId_t Bvfs::get_dir_filenameid()
364 {
365    uint32_t id;
366    if (dir_filenameid) {
367       return dir_filenameid;
368    }
369    POOL_MEM q;
370    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
371    db_sql_query(db, q.c_str(), db_int_handler, &id);
372    dir_filenameid = id;
373    return dir_filenameid;
374 }
375
376 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
377 {
378    uint32_t nb=0;
379    db_list_ctx jobids_list;
380
381    db_lock(mdb);
382    db_start_transaction(jcr, mdb);
383
384 #ifdef xxx
385    /* TODO: Remove this code when updating make_bacula_table script */
386    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
387    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
388       Dmsg0(dbglevel, "Creating cache table\n");
389       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
390       QUERY_DB(jcr, mdb, mdb->cmd);
391
392       Mmsg(mdb->cmd,
393            "CREATE TABLE PathHierarchy ( "
394            "PathId integer NOT NULL, "
395            "PPathId integer NOT NULL, "
396            "CONSTRAINT pathhierarchy_pkey "
397            "PRIMARY KEY (PathId))");
398       QUERY_DB(jcr, mdb, mdb->cmd); 
399
400       Mmsg(mdb->cmd,
401            "CREATE INDEX pathhierarchy_ppathid "
402            "ON PathHierarchy (PPathId)");
403       QUERY_DB(jcr, mdb, mdb->cmd);
404
405       Mmsg(mdb->cmd, 
406            "CREATE TABLE PathVisibility ("
407            "PathId integer NOT NULL, "
408            "JobId integer NOT NULL, "
409            "Size int8 DEFAULT 0, "
410            "Files int4 DEFAULT 0, "
411            "CONSTRAINT pathvisibility_pkey "
412            "PRIMARY KEY (JobId, PathId))");
413       QUERY_DB(jcr, mdb, mdb->cmd);
414
415       Mmsg(mdb->cmd, 
416            "CREATE INDEX pathvisibility_jobid "
417            "ON PathVisibility (JobId)");
418       QUERY_DB(jcr, mdb, mdb->cmd);
419
420    }
421 #endif
422
423    Mmsg(mdb->cmd, 
424  "SELECT JobId from Job "
425   "WHERE HasCache = 0 "
426     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
427   "ORDER BY JobId");
428
429    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
430
431    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
432
433    db_end_transaction(jcr, mdb);
434    db_start_transaction(jcr, mdb);
435    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
436    Mmsg(mdb->cmd, 
437         "DELETE FROM PathVisibility "
438          "WHERE NOT EXISTS "
439         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
440    nb = DELETE_DB(jcr, mdb, mdb->cmd);
441    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
442
443    db_end_transaction(jcr, mdb);
444    db_unlock(mdb);
445 }
446
447 /*
448  * Update the bvfs cache for given jobids (1,2,3,4)
449  */
450 void
451 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
452 {
453    pathid_cache ppathid_cache;
454    JobId_t JobId;
455    char *p;
456
457    for (p=jobids; ; ) {
458       int stat = get_next_jobid_from_list(&p, &JobId);
459       if (stat < 0) {
460          return;
461       }
462       if (stat == 0) {
463          break;
464       }
465       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
466       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
467    }
468 }
469
470 /* 
471  * Update the bvfs cache for current jobids
472  */
473 void Bvfs::update_cache()
474 {
475    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
476 }
477
478 /* Change the current directory, returns true if the path exists */
479 bool Bvfs::ch_dir(const char *path)
480 {
481    pm_strcpy(db->path, path);
482    db->pnl = strlen(db->path);
483    ch_dir(db_get_path_record(jcr, db)); 
484    return pwd_id != 0;
485 }
486
487 /* 
488  * Get all file versions for a specified client
489  * TODO: Handle basejobs using different client
490  */
491 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
492 {
493    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
494          (uint64_t)fnid, client);
495    char ed1[50], ed2[50];
496    POOL_MEM q;
497    if (see_copies) {
498       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
499    } else {
500       Mmsg(q, " AND Job.Type = 'B' ");
501    }
502
503    POOL_MEM query;
504
505    Mmsg(query,//    1           2              3       
506 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
507 //         4          5           6
508         "File.JobId, File.LStat, File.FileId, "
509 //         7                    8
510        "Media.VolumeName, Media.InChanger "
511 "FROM File, Job, Client, JobMedia, Media "
512 "WHERE File.FilenameId = %s "
513   "AND File.PathId=%s "
514   "AND File.JobId = Job.JobId "
515   "AND Job.JobId = JobMedia.JobId "
516   "AND File.FileIndex >= JobMedia.FirstIndex "
517   "AND File.FileIndex <= JobMedia.LastIndex "
518   "AND JobMedia.MediaId = Media.MediaId "
519   "AND Job.ClientId = Client.ClientId "
520   "AND Client.Name = '%s' "
521   "%s ORDER BY FileId LIMIT %d OFFSET %d"
522         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
523         limit, offset);
524    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
525    db_sql_query(db, query.c_str(), list_entries, user_data);
526 }
527
528 DBId_t Bvfs::get_root()
529 {
530    *db->path = 0;
531    return db_get_path_record(jcr, db);
532 }
533
534 static int path_handler(void *ctx, int fields, char **row)
535 {
536    Bvfs *fs = (Bvfs *) ctx;
537    return fs->_handle_path(ctx, fields, row);
538 }
539
540 int Bvfs::_handle_path(void *ctx, int fields, char **row)
541 {
542    if (bvfs_is_dir(row)) {
543       /* can have the same path 2 times */
544       if (strcmp(row[BVFS_Name], prev_dir)) {
545          pm_strcpy(prev_dir, row[BVFS_Name]);
546          return list_entries(user_data, fields, row);
547       }
548    }
549    return 0;
550 }
551
552 /* 
553  * Retrieve . and .. information
554  */
555 void Bvfs::ls_special_dirs()
556 {
557    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
558    char ed1[50], ed2[50];
559    if (*jobids == 0) {
560       return;
561    }
562    if (!dir_filenameid) {
563       get_dir_filenameid();
564    }
565
566    /* Will fetch directories  */
567    *prev_dir = 0;
568
569    POOL_MEM query;
570    Mmsg(query, 
571 "((SELECT PPathId AS PathId, '..' AS Path "
572     "FROM  PathHierarchy "
573    "WHERE  PathId = %s) "
574 "UNION "
575  "(SELECT %s AS PathId, '.' AS Path))",
576         edit_uint64(pwd_id, ed1), ed1);
577
578    POOL_MEM query2;
579    Mmsg(query2,// 1      2     3        4     5       6
580 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
581   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
582        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
583               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
584        "WHERE File1.FilenameId = %s "
585        "AND File1.JobId IN (%s)) AS listfile1 "
586   "ON (tmp.PathId = listfile1.PathId) "
587   "ORDER BY tmp.Path, JobId DESC ",
588         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
589
590    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
591    db_sql_query(db, query2.c_str(), path_handler, this);
592 }
593
594 /* Returns true if we have dirs to read */
595 bool Bvfs::ls_dirs()
596 {
597    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
598    char ed1[50], ed2[50];
599    if (*jobids == 0) {
600       return false;
601    }
602
603    POOL_MEM filter;
604    if (*pattern) {
605       Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
606    }
607
608    if (!dir_filenameid) {
609       get_dir_filenameid();
610    }
611
612    /* the sql query displays same directory multiple time, take the first one */
613    *prev_dir = 0;
614
615    /* Let's retrieve the list of the visible dirs in this dir ...
616     * First, I need the empty filenameid to locate efficiently
617     * the dirs in the file table
618     * my $dir_filenameid = $self->get_dir_filenameid();
619     */
620    /* Then we get all the dir entries from File ... */
621    POOL_MEM query;
622    Mmsg(query,
623 //       0     1     2   3      4     5       6
624 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
625     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
626            "lower(Path1.Path) AS lpath, "
627            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
628            "listfile1.FileId AS FileId "
629     "FROM ( "
630       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
631       "FROM PathHierarchy AS PathHierarchy1 "
632       "JOIN Path AS Path2 "
633         "ON (PathHierarchy1.PathId = Path2.PathId) "
634       "JOIN PathVisibility AS PathVisibility1 "
635         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
636       "WHERE PathHierarchy1.PPathId = %s "
637       "AND PathVisibility1.jobid IN (%s) "
638            "%s "
639      ") AS listpath1 "
640    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
641
642    "LEFT JOIN ( " /* get attributes if any */
643        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
644               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
645        "WHERE File1.FilenameId = %s "
646        "AND File1.JobId IN (%s)) AS listfile1 "
647        "ON (listpath1.PathId = listfile1.PathId) "
648     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
649         edit_uint64(pwd_id, ed1),
650         jobids,
651         filter.c_str(),
652         edit_uint64(dir_filenameid, ed2),
653         jobids,
654         limit, offset);
655
656    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
657
658    db_lock(db);
659    db_sql_query(db, query.c_str(), path_handler, this);
660    nb_record = db->num_rows;
661    db_unlock(db);
662
663    return nb_record == limit;
664 }
665
666 /* Returns true if we have files to read */
667 bool Bvfs::ls_files()
668 {
669    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
670    char ed1[50];
671    if (*jobids == 0) {
672       return false;
673    }
674
675    if (!pwd_id) {
676       ch_dir(get_root());
677    }
678
679    POOL_MEM filter;
680    if (*pattern) {
681       Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
682    }
683    /* TODO: Use JobTDate instead of FileId to determine the latest version */
684
685 /* Postgresql
686  SELECT DISTINCT ON (FilenameId) 'F', PathId, T.FilenameId, 
687   Filename.Name, JobId, LStat, FileId
688    FROM 
689        (SELECT FileId, JobId, PathId, FilenameId, FileIndex, LStat, MD5 
690           FROM File WHERE JobId IN (7) AND PathId = 9
691          UNION ALL 
692         SELECT File.FileId, File.JobId, PathId, FilenameId, 
693                File.FileIndex, LStat, MD5 
694           FROM BaseFiles JOIN File USING (FileId) 
695          WHERE BaseFiles.JobId IN (7) AND File.PathId = 9
696         ) AS T JOIN Job USING (JobId) JOIN Filename USING (FilenameId)
697    ORDER BY FilenameId, StartTime DESC
698
699 Mysql
700 SELECT FileId, Job.JobId AS JobId, FileIndex, File.PathId AS PathId, 
701        File.FilenameId AS FilenameId, Filename.Name, LStat, MD5 
702 FROM Job, File, ( 
703     SELECT MAX(JobTDate) AS JobTDate, PathId, FilenameId 
704       FROM ( 
705         SELECT JobTDate, PathId, FilenameId
706           FROM File JOIN Job USING (JobId)
707          WHERE File.JobId IN (7) AND PathId = 9
708           UNION ALL 
709         SELECT JobTDate, PathId, FilenameId 
710           FROM BaseFiles                 
711                JOIN File USING (FileId) 
712                JOIN Job  ON    (BaseJobId = Job.JobId) 
713          WHERE BaseFiles.JobId IN (7)   AND PathId = 9
714        ) AS tmp GROUP BY PathId, FilenameId
715     ) AS T1 JOIN Filename USING (FilenameId)
716 WHERE (Job.JobId IN (  
717         SELECT DISTINCT BaseJobId FROM BaseFiles WHERE JobId IN (7)) 
718         OR Job.JobId IN (7)) 
719   AND T1.JobTDate = Job.JobTDate
720   AND Job.JobId = File.JobId
721   AND T1.PathId = File.PathId 
722   AND T1.FilenameId = File.FilenameId
723
724
725
726 */
727
728    POOL_MEM query;
729    Mmsg(query, //    1              2             3          4
730 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
731         "File.LStat, listfiles.id "
732 "FROM File, ( "
733        "SELECT Filename.Name as Name, max(File.FileId) as id "
734          "FROM File, Filename "
735         "WHERE File.FilenameId = Filename.FilenameId "
736           "AND Filename.Name != '' "
737           "AND File.PathId = %s "
738           "AND File.JobId IN (%s) "
739           "%s "
740         "GROUP BY Filename.Name "
741         "ORDER BY Filename.Name LIMIT %d OFFSET %d "
742      ") AS listfiles "
743 "WHERE File.FileId = listfiles.id",
744         edit_uint64(pwd_id, ed1),
745         jobids,
746         filter.c_str(),
747         limit,
748         offset);
749    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
750
751    db_lock(db);
752    db_sql_query(db, query.c_str(), list_entries, user_data);
753    nb_record = db->num_rows;
754    db_unlock(db);
755
756    return nb_record == limit;
757 }
758
759
760 /* 
761  * Return next Id from comma separated list   
762  *
763  * Returns:
764  *   1 if next Id returned
765  *   0 if no more Ids are in list
766  *  -1 there is an error
767  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
768  */
769 static int get_next_id_from_list(char **p, int64_t *Id)
770 {
771    const int maxlen = 30;
772    char id[maxlen+1];
773    char *q = *p;
774
775    id[0] = 0;
776    for (int i=0; i<maxlen; i++) {
777       if (*q == 0) {
778          break;
779       } else if (*q == ',') {
780          q++;
781          break;
782       }
783       id[i] = *q++;
784       id[i+1] = 0;
785    }
786    if (id[0] == 0) {
787       return 0;
788    } else if (!is_a_number(id)) {
789       return -1;                      /* error */
790    }
791    *p = q;
792    *Id = str_to_int64(id);
793    return 1;
794 }
795
796 static int get_path_handler(void *ctx, int fields, char **row)
797 {
798    POOL_MEM *buf = (POOL_MEM *) ctx;
799    pm_strcpy(*buf, row[0]);
800    return 0;
801 }
802
803 static bool check_temp(char *output_table)
804 {
805    if (output_table[0] == 'b' &&
806        output_table[1] == '2' &&
807        is_an_integer(output_table + 2))
808    {
809       return true;
810    }
811    return false;
812 }
813
814 bool Bvfs::drop_restore_list(char *output_table)
815 {
816    POOL_MEM query;
817    if (check_temp(output_table)) {
818       Mmsg(query, "DROP TABLE %s", output_table);
819       db_sql_query(db, query.c_str(), NULL, NULL);
820       return true;
821    }
822    return false;
823 }
824
825 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
826                                 char *output_table)
827 {
828    POOL_MEM query;
829    POOL_MEM tmp, tmp2;
830    int64_t id, jobid;
831    bool init=false;
832    bool ret=false;
833    /* check args */
834    if ((*fileid   && !is_a_number_list(fileid))  ||
835        (*dirid    && !is_a_number_list(dirid))   ||
836        (*hardlink && !is_a_number_list(hardlink))||
837        (!*hardlink && !*fileid && !*dirid && !*hardlink))
838    {
839       return false;
840    }
841    if (!check_temp(output_table)) {
842       return false;
843    }
844
845    Mmsg(query, "CREATE TEMPORARY TABLE btemp%s AS ", output_table);
846
847    if (*fileid) {
848       init=true;
849       Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
850                    "FROM File WHERE FileId IN (%s))", fileid);
851       pm_strcat(query, tmp.c_str());
852    }
853
854    while (get_next_id_from_list(&dirid, &id) == 1) {
855       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
856       
857       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
858          Dmsg0(dbglevel, "Can't search for path\n");
859          /* print error */
860          return false;
861       }
862       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
863          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
864                id, tmp.c_str(), tmp2.c_str());
865          break;
866       }
867       /* escape % and _ for LIKE search */
868       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
869       char *p = tmp.c_str();
870       for (char *s = tmp2.c_str(); *s ; s++) {
871          if (*s == '%' || *s == '_' || *s == '\\') {
872             *p = '\\'; 
873             p++;
874          }
875          *p = *s; 
876          p++;
877       }
878       *p = '\0';
879       tmp.strcat("%");
880
881       size_t len = strlen(tmp.c_str());
882       tmp2.check_size((len+1) * 2);
883       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
884
885       if (init) {
886          query.strcat(" UNION ");
887       }
888       Mmsg(tmp, "(SELECT File.JobId, File.FileIndex, File.FilenameId, "
889                         "File.PathId, FileId "
890                    "FROM Path JOIN File USING (PathId) "
891                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s)) ", 
892            tmp2.c_str(), jobids); 
893       query.strcat(tmp.c_str());
894       init = true;
895    }
896
897    /* expect jobid,fileindex */
898    int64_t prev_jobid=0;
899    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
900       if (get_next_id_from_list(&hardlink, &id) != 1) {
901          Dmsg0(dbglevel, "hardlink should be two by two\n");
902          return false;
903       }
904       if (jobid != prev_jobid) { /* new job */
905          if (prev_jobid == 0) {  /* first jobid */
906             if (init) {
907                query.strcat(" UNION ");
908             }
909          } else {               /* end last job, start new one */
910             tmp.strcat(")) UNION ");
911             query.strcat(tmp.c_str());
912          }
913          Mmsg(tmp, "(SELECT JobId, FileIndex, FilenameId, PathId, FileId "
914                        "FROM File WHERE JobId = %lld " 
915                         "AND FileIndex IN (%lld", jobid, id);
916          prev_jobid = jobid;
917
918       } else {                  /* same job, add new findex */
919          Mmsg(tmp2, ", %lld", id);
920          tmp.strcat(tmp2.c_str());
921       }
922    }
923
924    if (prev_jobid != 0) {       /* end last job */
925       tmp.strcat(")) ");
926       query.strcat(tmp.c_str());
927       init = true;
928    }
929
930    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
931
932    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
933       Dmsg0(dbglevel, "Can't execute q\n");
934       goto bail_out;
935    }
936
937    /* TODO: handle basejob and SQLite3 */
938    Mmsg(query, sql_bvfs_select[db_type], output_table, output_table);
939
940    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
941    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
942       Dmsg0(dbglevel, "Can't execute q\n");
943       goto bail_out;
944    }
945    ret = true;
946
947 bail_out:
948    Mmsg(query, "DROP TABLE btemp%s", output_table);
949    db_sql_query(db, query.c_str(), NULL, NULL);
950    return ret;
951 }