]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/bvfs.c
bvfs: Use single transaction for each job during update
[bacula/bacula] / bacula / src / cats / bvfs.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2009-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation, which is 
11    listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28
29 #include "bacula.h"
30
31 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
32
33 #include "cats.h"
34 #include "bdb_priv.h"
35 #include "sql_glue.h"
36 #include "lib/htable.h"
37 #include "bvfs.h"
38
39 #define dbglevel 10
40 #define dbglevel_sql 15
41
42 static int result_handler(void *ctx, int fields, char **row)
43 {
44    if (fields == 4) {
45       Pmsg4(0, "%s\t%s\t%s\t%s\n", 
46             row[0], row[1], row[2], row[3]);
47    } else if (fields == 5) {
48       Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n", 
49             row[0], row[1], row[2], row[3], row[4]);
50    } else if (fields == 6) {
51       Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n", 
52             row[0], row[1], row[2], row[3], row[4], row[5]);
53    } else if (fields == 7) {
54       Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n", 
55             row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
56    }
57    return 0;
58 }
59
60 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
61    jcr = j;
62    jcr->inc_use_count();
63    db = mdb;                 /* need to inc ref count */
64    jobids = get_pool_memory(PM_NAME);
65    prev_dir = get_pool_memory(PM_NAME);
66    pattern = get_pool_memory(PM_NAME);
67    *jobids = *prev_dir = *pattern = 0;
68    dir_filenameid = pwd_id = offset = 0;
69    see_copies = see_all_versions = false;
70    limit = 1000;
71    attr = new_attr(jcr);
72    list_entries = result_handler;
73    user_data = this;
74    username = NULL;
75 }
76
77 Bvfs::~Bvfs() {
78    free_pool_memory(jobids);
79    free_pool_memory(pattern);
80    free_pool_memory(prev_dir);
81    if (username) {
82       free(username);
83    }
84    free_attr(attr);
85    jcr->dec_use_count();
86 }
87
88 void Bvfs::filter_jobid()
89 {
90    if (!username) {
91       return;
92    }
93
94    /* Query used by Bweb to filter clients, activated when using
95     * set_username() 
96     */
97    POOL_MEM query;
98    Mmsg(query,
99       "SELECT DISTINCT JobId FROM Job JOIN Client USING (ClientId) "
100         "JOIN (SELECT ClientId FROM client_group_member "
101         "JOIN client_group USING (client_group_id) "
102         "JOIN bweb_client_group_acl USING (client_group_id) "
103         "JOIN bweb_user USING (userid) "
104        "WHERE bweb_user.username = '%s' "
105       ") AS filter USING (ClientId) "
106         " WHERE JobId IN (%s)", 
107         username, jobids);
108
109    db_list_ctx ctx;
110    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
111    db_sql_query(db, query.c_str(), db_list_handler, &ctx);
112    pm_strcpy(jobids, ctx.list);
113 }
114
115 void Bvfs::set_jobid(JobId_t id)
116 {
117    Mmsg(jobids, "%lld", (uint64_t)id);
118    filter_jobid();
119 }
120
121 void Bvfs::set_jobids(char *ids)
122 {
123    pm_strcpy(jobids, ids);
124    filter_jobid();
125 }
126
127 /* 
128  * TODO: Find a way to let the user choose how he wants to display
129  * files and directories
130  */
131
132
133 /* 
134  * Working Object to store PathId already seen (avoid
135  * database queries), equivalent to %cache_ppathid in perl
136  */
137
138 #define NITEMS 50000
139 class pathid_cache {
140 private:
141    hlink *nodes;
142    int nb_node;
143    int max_node;
144
145    alist *table_node;
146
147    htable *cache_ppathid;
148
149 public:
150    pathid_cache() {
151       hlink link;
152       cache_ppathid = (htable *)malloc(sizeof(htable));
153       cache_ppathid->init(&link, &link, NITEMS);
154       max_node = NITEMS;
155       nodes = (hlink *) malloc(max_node * sizeof (hlink));
156       nb_node = 0;
157       table_node = New(alist(5, owned_by_alist));
158       table_node->append(nodes);
159    }
160
161    hlink *get_hlink() {
162       if (++nb_node >= max_node) {
163          nb_node = 0;
164          nodes = (hlink *)malloc(max_node * sizeof(hlink));
165          table_node->append(nodes);
166       }
167       return nodes + nb_node;
168    }
169
170    bool lookup(char *pathid) {
171       bool ret = cache_ppathid->lookup(pathid) != NULL;
172       return ret;
173    }
174    
175    void insert(char *pathid) {
176       hlink *h = get_hlink();
177       cache_ppathid->insert(pathid, h);
178    }
179
180    ~pathid_cache() {
181       cache_ppathid->destroy();
182       free(cache_ppathid);
183       delete table_node;
184    }
185 private:
186    pathid_cache(const pathid_cache &); /* prohibit pass by value */
187    pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
188 } ;
189
190 /* Return the parent_dir with the trailing /  (update the given string)
191  * TODO: see in the rest of bacula if we don't have already this function
192  * dir=/tmp/toto/
193  * dir=/tmp/
194  * dir=/
195  * dir=
196  */
197 char *bvfs_parent_dir(char *path)
198 {
199    char *p = path;
200    int len = strlen(path) - 1;
201
202    /* windows directory / */
203    if (len == 2 && B_ISALPHA(path[0]) 
204                 && path[1] == ':' 
205                 && path[2] == '/')
206    {
207       len = 0;
208       path[0] = '\0';
209    }
210
211    if (len >= 0 && path[len] == '/') {      /* if directory, skip last / */
212       path[len] = '\0';
213    }
214
215    if (len > 0) {
216       p += len;
217       while (p > path && !IsPathSeparator(*p)) {
218          p--;
219       }
220       p[1] = '\0';
221    }
222    return path;
223 }
224
225 /* Return the basename of the with the trailing /
226  * TODO: see in the rest of bacula if we don't have
227  * this function already
228  */
229 char *bvfs_basename_dir(char *path)
230 {
231    char *p = path;
232    int len = strlen(path) - 1;
233
234    if (path[len] == '/') {      /* if directory, skip last / */
235       len -= 1;
236    }
237
238    if (len > 0) {
239       p += len;
240       while (p > path && !IsPathSeparator(*p)) {
241          p--;
242       }
243       if (*p == '/') {
244          p++;                  /* skip first / */
245       }
246    } 
247    return p;
248 }
249
250 static void build_path_hierarchy(JCR *jcr, B_DB *mdb, 
251                                  pathid_cache &ppathid_cache, 
252                                  char *org_pathid, char *path)
253 {
254    Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
255    char pathid[50];
256    ATTR_DBR parent;
257    char *bkp = mdb->path;
258    strncpy(pathid, org_pathid, sizeof(pathid));
259
260    /* Does the ppathid exist for this ? we use a memory cache...  In order to
261     * avoid the full loop, we consider that if a dir is allready in the
262     * PathHierarchy table, then there is no need to calculate all the
263     * hierarchy
264     */
265    while (path && *path)
266    {
267       if (!ppathid_cache.lookup(pathid))
268       {
269          Mmsg(mdb->cmd, 
270               "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
271               pathid);
272
273          QUERY_DB(jcr, mdb, mdb->cmd);
274          /* Do we have a result ? */
275          if (sql_num_rows(mdb) > 0) {
276             ppathid_cache.insert(pathid);
277             /* This dir was in the db ...
278              * It means we can leave, the tree has allready been built for
279              * this dir
280              */
281             goto bail_out;
282          } else {
283             /* search or create parent PathId in Path table */
284             mdb->path = bvfs_parent_dir(path);
285             mdb->pnl = strlen(mdb->path);
286             if (!db_create_path_record(jcr, mdb, &parent)) {
287                goto bail_out;
288             }
289             ppathid_cache.insert(pathid);
290             
291             Mmsg(mdb->cmd,
292                  "INSERT INTO PathHierarchy (PathId, PPathId) "
293                  "VALUES (%s,%lld)",
294                  pathid, (uint64_t) parent.PathId);
295             
296             INSERT_DB(jcr, mdb, mdb->cmd);
297
298             edit_uint64(parent.PathId, pathid);
299             path = mdb->path;   /* already done */
300          }
301       } else {
302          /* It's already in the cache.  We can leave, no time to waste here,
303           * all the parent dirs have allready been done
304           */
305          goto bail_out;
306       }
307    }   
308
309 bail_out:
310    mdb->path = bkp;
311    mdb->fnl = 0;
312 }
313
314 /* 
315  * Internal function to update path_hierarchy cache with a shared pathid cache
316  */
317 static void update_path_hierarchy_cache(JCR *jcr,
318                                         B_DB *mdb,
319                                         pathid_cache &ppathid_cache,
320                                         JobId_t JobId)
321 {
322    Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
323
324    uint32_t num;
325    char jobid[50];
326    edit_uint64(JobId, jobid);
327  
328    db_lock(mdb);
329    db_start_transaction(jcr, mdb);
330
331    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
332    
333    if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
334       Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
335       goto bail_out;
336    }
337
338    /* Inserting path records for JobId */
339    Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
340                    "SELECT DISTINCT PathId, JobId "
341                      "FROM (SELECT PathId, JobId FROM File WHERE JobId = %s "
342                            "UNION "
343                            "SELECT PathId, BaseFiles.JobId "
344                              "FROM BaseFiles JOIN File AS F USING (FileId) "
345                             "WHERE BaseFiles.JobId = %s) AS B",
346         jobid, jobid);
347    QUERY_DB(jcr, mdb, mdb->cmd);
348
349    /* Now we have to do the directory recursion stuff to determine missing
350     * visibility We try to avoid recursion, to be as fast as possible We also
351     * only work on not allready hierarchised directories...
352     */
353    Mmsg(mdb->cmd, 
354      "SELECT PathVisibility.PathId, Path "
355        "FROM PathVisibility "
356             "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
357             "LEFT JOIN PathHierarchy "
358          "ON (PathVisibility.PathId = PathHierarchy.PathId) "
359       "WHERE PathVisibility.JobId = %s "
360         "AND PathHierarchy.PathId IS NULL "
361       "ORDER BY Path", jobid);
362    Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
363    QUERY_DB(jcr, mdb, mdb->cmd);
364
365    /* TODO: I need to reuse the DB connection without emptying the result 
366     * So, now i'm copying the result in memory to be able to query the
367     * catalog descriptor again.
368     */
369    num = sql_num_rows(mdb);
370    if (num > 0) {
371       char **result = (char **)malloc (num * 2 * sizeof(char *));
372       
373       SQL_ROW row;
374       int i=0;
375       while((row = sql_fetch_row(mdb))) {
376          result[i++] = bstrdup(row[0]);
377          result[i++] = bstrdup(row[1]);
378       }
379       
380       i=0;
381       while (num > 0) {
382          build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
383          free(result[i++]);
384          free(result[i++]);
385          num--;
386       }
387       free(result);
388    }
389
390    if (mdb->db_get_type_index() == SQL_TYPE_SQLITE3) {
391       Mmsg(mdb->cmd, 
392  "INSERT INTO PathVisibility (PathId, JobId) "
393    "SELECT DISTINCT h.PPathId AS PathId, %s "
394      "FROM PathHierarchy AS h "
395     "WHERE h.PathId IN (SELECT PathId FROM PathVisibility WHERE JobId=%s) "
396       "AND h.PPathId NOT IN (SELECT PathId FROM PathVisibility WHERE JobId=%s)",
397            jobid, jobid, jobid );
398
399    } else {
400       Mmsg(mdb->cmd, 
401   "INSERT INTO PathVisibility (PathId, JobId)  "
402    "SELECT a.PathId,%s "
403    "FROM ( "
404      "SELECT DISTINCT h.PPathId AS PathId "
405        "FROM PathHierarchy AS h "
406        "JOIN  PathVisibility AS p ON (h.PathId=p.PathId) "
407       "WHERE p.JobId=%s) AS a LEFT JOIN "
408        "(SELECT PathId "
409           "FROM PathVisibility "
410          "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
411    "WHERE b.PathId IS NULL",  jobid, jobid, jobid);
412    }
413
414    do {
415       QUERY_DB(jcr, mdb, mdb->cmd);
416    } while (sql_affected_rows(mdb) > 0);
417    
418    Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
419    UPDATE_DB(jcr, mdb, mdb->cmd);
420
421 bail_out:
422    db_end_transaction(jcr, mdb);
423    db_unlock(mdb);
424 }
425
426 /* 
427  * Find an store the filename descriptor for empty directories Filename.Name=''
428  */
429 DBId_t Bvfs::get_dir_filenameid()
430 {
431    uint32_t id;
432    if (dir_filenameid) {
433       return dir_filenameid;
434    }
435    POOL_MEM q;
436    Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
437    db_sql_query(db, q.c_str(), db_int_handler, &id);
438    dir_filenameid = id;
439    return dir_filenameid;
440 }
441
442 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
443 {
444    uint32_t nb=0;
445    db_list_ctx jobids_list;
446
447    db_lock(mdb);
448
449 #ifdef xxx
450    /* TODO: Remove this code when updating make_bacula_table script */
451    Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE HasCache<>2 LIMIT 1");
452    if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
453       Dmsg0(dbglevel, "Creating cache table\n");
454       Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
455       QUERY_DB(jcr, mdb, mdb->cmd);
456
457       Mmsg(mdb->cmd,
458            "CREATE TABLE PathHierarchy ( "
459            "PathId integer NOT NULL, "
460            "PPathId integer NOT NULL, "
461            "CONSTRAINT pathhierarchy_pkey "
462            "PRIMARY KEY (PathId))");
463       QUERY_DB(jcr, mdb, mdb->cmd); 
464
465       Mmsg(mdb->cmd,
466            "CREATE INDEX pathhierarchy_ppathid "
467            "ON PathHierarchy (PPathId)");
468       QUERY_DB(jcr, mdb, mdb->cmd);
469
470       Mmsg(mdb->cmd, 
471            "CREATE TABLE PathVisibility ("
472            "PathId integer NOT NULL, "
473            "JobId integer NOT NULL, "
474            "Size int8 DEFAULT 0, "
475            "Files int4 DEFAULT 0, "
476            "CONSTRAINT pathvisibility_pkey "
477            "PRIMARY KEY (JobId, PathId))");
478       QUERY_DB(jcr, mdb, mdb->cmd);
479
480       Mmsg(mdb->cmd, 
481            "CREATE INDEX pathvisibility_jobid "
482            "ON PathVisibility (JobId)");
483       QUERY_DB(jcr, mdb, mdb->cmd);
484
485    }
486 #endif
487
488    Mmsg(mdb->cmd, 
489  "SELECT JobId from Job "
490   "WHERE HasCache = 0 "
491     "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
492   "ORDER BY JobId");
493
494    db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
495
496    bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
497
498    db_start_transaction(jcr, mdb);
499    Dmsg0(dbglevel, "Cleaning pathvisibility\n");
500    Mmsg(mdb->cmd, 
501         "DELETE FROM PathVisibility "
502          "WHERE NOT EXISTS "
503         "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
504    nb = DELETE_DB(jcr, mdb, mdb->cmd);
505    Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
506
507    db_end_transaction(jcr, mdb);
508    db_unlock(mdb);
509 }
510
511 /*
512  * Update the bvfs cache for given jobids (1,2,3,4)
513  */
514 void
515 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
516 {
517    pathid_cache ppathid_cache;
518    JobId_t JobId;
519    char *p;
520
521    for (p=jobids; ; ) {
522       int stat = get_next_jobid_from_list(&p, &JobId);
523       if (stat < 0) {
524          return;
525       }
526       if (stat == 0) {
527          break;
528       }
529       Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
530       update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
531    }
532 }
533
534 /* 
535  * Update the bvfs cache for current jobids
536  */
537 void Bvfs::update_cache()
538 {
539    bvfs_update_path_hierarchy_cache(jcr, db, jobids);
540 }
541
542 /* Change the current directory, returns true if the path exists */
543 bool Bvfs::ch_dir(const char *path)
544 {
545    pm_strcpy(db->path, path);
546    db->pnl = strlen(db->path);
547    ch_dir(db_get_path_record(jcr, db)); 
548    return pwd_id != 0;
549 }
550
551 /* 
552  * Get all file versions for a specified client
553  * TODO: Handle basejobs using different client
554  */
555 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
556 {
557    Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
558          (uint64_t)fnid, client);
559    char ed1[50], ed2[50];
560    POOL_MEM q;
561    if (see_copies) {
562       Mmsg(q, " AND Job.Type IN ('C', 'B') ");
563    } else {
564       Mmsg(q, " AND Job.Type = 'B' ");
565    }
566
567    POOL_MEM query;
568
569    Mmsg(query,//    1           2              3       
570 "SELECT 'V', File.PathId, File.FilenameId,  File.Md5, "
571 //         4          5           6
572         "File.JobId, File.LStat, File.FileId, "
573 //         7                    8
574        "Media.VolumeName, Media.InChanger "
575 "FROM File, Job, Client, JobMedia, Media "
576 "WHERE File.FilenameId = %s "
577   "AND File.PathId=%s "
578   "AND File.JobId = Job.JobId "
579   "AND Job.JobId = JobMedia.JobId "
580   "AND File.FileIndex >= JobMedia.FirstIndex "
581   "AND File.FileIndex <= JobMedia.LastIndex "
582   "AND JobMedia.MediaId = Media.MediaId "
583   "AND Job.ClientId = Client.ClientId "
584   "AND Client.Name = '%s' "
585   "%s ORDER BY FileId LIMIT %d OFFSET %d"
586         ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
587         limit, offset);
588    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
589    db_sql_query(db, query.c_str(), list_entries, user_data);
590 }
591
592 DBId_t Bvfs::get_root()
593 {
594    *db->path = 0;
595    return db_get_path_record(jcr, db);
596 }
597
598 static int path_handler(void *ctx, int fields, char **row)
599 {
600    Bvfs *fs = (Bvfs *) ctx;
601    return fs->_handle_path(ctx, fields, row);
602 }
603
604 int Bvfs::_handle_path(void *ctx, int fields, char **row)
605 {
606    if (bvfs_is_dir(row)) {
607       /* can have the same path 2 times */
608       if (strcmp(row[BVFS_Name], prev_dir)) {
609          pm_strcpy(prev_dir, row[BVFS_Name]);
610          return list_entries(user_data, fields, row);
611       }
612    }
613    return 0;
614 }
615
616 /* 
617  * Retrieve . and .. information
618  */
619 void Bvfs::ls_special_dirs()
620 {
621    Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
622    char ed1[50], ed2[50];
623    if (*jobids == 0) {
624       return;
625    }
626    if (!dir_filenameid) {
627       get_dir_filenameid();
628    }
629
630    /* Will fetch directories  */
631    *prev_dir = 0;
632
633    POOL_MEM query;
634    Mmsg(query, 
635 "(SELECT PPathId AS PathId, '..' AS Path "
636     "FROM  PathHierarchy "
637    "WHERE  PathId = %s "
638 "UNION "
639  "SELECT %s AS PathId, '.' AS Path)",
640         edit_uint64(pwd_id, ed1), ed1);
641
642    POOL_MEM query2;
643    Mmsg(query2,// 1      2     3        4     5       6
644 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
645   "FROM %s AS tmp  LEFT JOIN ( " // get attributes if any
646        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
647               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
648        "WHERE File1.FilenameId = %s "
649        "AND File1.JobId IN (%s)) AS listfile1 "
650   "ON (tmp.PathId = listfile1.PathId) "
651   "ORDER BY tmp.Path, JobId DESC ",
652         query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
653
654    Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
655    db_sql_query(db, query2.c_str(), path_handler, this);
656 }
657
658 /* Returns true if we have dirs to read */
659 bool Bvfs::ls_dirs()
660 {
661    Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
662    char ed1[50], ed2[50];
663    if (*jobids == 0) {
664       return false;
665    }
666
667    POOL_MEM query;
668    POOL_MEM filter;
669    if (*pattern) {
670       Mmsg(filter, " AND Path2.Path %s '%s' ", 
671            match_query[db_get_type_index(db)], pattern);
672    }
673
674    if (!dir_filenameid) {
675       get_dir_filenameid();
676    }
677
678    /* the sql query displays same directory multiple time, take the first one */
679    *prev_dir = 0;
680
681    /* Let's retrieve the list of the visible dirs in this dir ...
682     * First, I need the empty filenameid to locate efficiently
683     * the dirs in the file table
684     * my $dir_filenameid = $self->get_dir_filenameid();
685     */
686    /* Then we get all the dir entries from File ... */
687    Mmsg(query,
688 //       0     1     2   3      4     5       6
689 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
690     "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
691            "lower(Path1.Path) AS lpath, "
692            "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
693            "listfile1.FileId AS FileId "
694     "FROM ( "
695       "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
696       "FROM PathHierarchy AS PathHierarchy1 "
697       "JOIN Path AS Path2 "
698         "ON (PathHierarchy1.PathId = Path2.PathId) "
699       "JOIN PathVisibility AS PathVisibility1 "
700         "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
701       "WHERE PathHierarchy1.PPathId = %s "
702       "AND PathVisibility1.JobId IN (%s) "
703            "%s "
704      ") AS listpath1 "
705    "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
706
707    "LEFT JOIN ( " /* get attributes if any */
708        "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
709               "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
710        "WHERE File1.FilenameId = %s "
711        "AND File1.JobId IN (%s)) AS listfile1 "
712        "ON (listpath1.PathId = listfile1.PathId) "
713     ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
714         edit_uint64(pwd_id, ed1),
715         jobids,
716         filter.c_str(),
717         edit_uint64(dir_filenameid, ed2),
718         jobids,
719         limit, offset);
720
721    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
722
723    db_lock(db);
724    db_sql_query(db, query.c_str(), path_handler, this);
725    nb_record = sql_num_rows(db);
726    db_unlock(db);
727
728    return nb_record == limit;
729 }
730
731 void build_ls_files_query(B_DB *db, POOL_MEM &query, 
732                           const char *JobId, const char *PathId,  
733                           const char *filter, int64_t limit, int64_t offset)
734 {
735    if (db_get_type_index(db) == SQL_TYPE_POSTGRESQL) {
736       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
737            JobId, PathId, JobId, PathId, 
738            filter, limit, offset);
739    } else {
740       Mmsg(query, sql_bvfs_list_files[db_get_type_index(db)], 
741            JobId, PathId, JobId, PathId, 
742            limit, offset, filter, JobId, JobId);
743    }
744 }
745
746 /* Returns true if we have files to read */
747 bool Bvfs::ls_files()
748 {
749    POOL_MEM query;
750    POOL_MEM filter;
751    char pathid[50];
752
753    Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
754    if (*jobids == 0) {
755       return false;
756    }
757
758    if (!pwd_id) {
759       ch_dir(get_root());
760    }
761
762    edit_uint64(pwd_id, pathid);
763    if (*pattern) {
764       Mmsg(filter, " AND Filename.Name %s '%s' ", 
765            match_query[db_get_type_index(db)], pattern);
766    }
767
768    build_ls_files_query(db, query, 
769                         jobids, pathid, filter.c_str(),
770                         limit, offset);
771
772    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
773
774    db_lock(db);
775    db_sql_query(db, query.c_str(), list_entries, user_data);
776    nb_record = sql_num_rows(db);
777    db_unlock(db);
778
779    return nb_record == limit;
780 }
781
782
783 /* 
784  * Return next Id from comma separated list   
785  *
786  * Returns:
787  *   1 if next Id returned
788  *   0 if no more Ids are in list
789  *  -1 there is an error
790  * TODO: merge with get_next_jobid_from_list() and get_next_dbid_from_list()
791  */
792 static int get_next_id_from_list(char **p, int64_t *Id)
793 {
794    const int maxlen = 30;
795    char id[maxlen+1];
796    char *q = *p;
797
798    id[0] = 0;
799    for (int i=0; i<maxlen; i++) {
800       if (*q == 0) {
801          break;
802       } else if (*q == ',') {
803          q++;
804          break;
805       }
806       id[i] = *q++;
807       id[i+1] = 0;
808    }
809    if (id[0] == 0) {
810       return 0;
811    } else if (!is_a_number(id)) {
812       return -1;                      /* error */
813    }
814    *p = q;
815    *Id = str_to_int64(id);
816    return 1;
817 }
818
819 static int get_path_handler(void *ctx, int fields, char **row)
820 {
821    POOL_MEM *buf = (POOL_MEM *) ctx;
822    pm_strcpy(*buf, row[0]);
823    return 0;
824 }
825
826 static bool check_temp(char *output_table)
827 {
828    if (output_table[0] == 'b' &&
829        output_table[1] == '2' &&
830        is_an_integer(output_table + 2))
831    {
832       return true;
833    }
834    return false;
835 }
836
837 void Bvfs::clear_cache()
838 {
839    db_sql_query(db, "BEGIN",                     NULL, NULL);
840    db_sql_query(db, "UPDATE Job SET HasCache=0", NULL, NULL);
841    db_sql_query(db, "TRUNCATE PathHierarchy",    NULL, NULL);
842    db_sql_query(db, "TRUNCATE PathVisibility",   NULL, NULL);
843    db_sql_query(db, "COMMIT",                    NULL, NULL);
844 }
845
846 bool Bvfs::drop_restore_list(char *output_table)
847 {
848    POOL_MEM query;
849    if (check_temp(output_table)) {
850       Mmsg(query, "DROP TABLE %s", output_table);
851       db_sql_query(db, query.c_str(), NULL, NULL);
852       return true;
853    }
854    return false;
855 }
856
857 bool Bvfs::compute_restore_list(char *fileid, char *dirid, char *hardlink, 
858                                 char *output_table)
859 {
860    POOL_MEM query;
861    POOL_MEM tmp, tmp2;
862    int64_t id, jobid;
863    bool init=false;
864    bool ret=false;
865    /* check args */
866    if ((*fileid   && !is_a_number_list(fileid))  ||
867        (*dirid    && !is_a_number_list(dirid))   ||
868        (*hardlink && !is_a_number_list(hardlink))||
869        (!*hardlink && !*fileid && !*dirid && !*hardlink))
870    {
871       return false;
872    }
873    if (!check_temp(output_table)) {
874       return false;
875    }
876
877    Mmsg(query, "CREATE TABLE btemp%s AS ", output_table);
878
879    if (*fileid) {               /* Select files with their direct id */
880       init=true;
881       Mmsg(tmp,"SELECT JobId, JobTDate, FileIndex, FilenameId, PathId, FileId "
882                   "FROM File JOIN Job USING (JobId) WHERE FileId IN (%s)",
883            fileid);
884       pm_strcat(query, tmp.c_str());
885    }
886
887    /* Add a directory content */
888    while (get_next_id_from_list(&dirid, &id) == 1) {
889       Mmsg(tmp, "SELECT Path FROM Path WHERE PathId=%lld", id);
890       
891       if (!db_sql_query(db, tmp.c_str(), get_path_handler, (void *)&tmp2)) {
892          Dmsg0(dbglevel, "Can't search for path\n");
893          /* print error */
894          return false;
895       }
896       if (!strcmp(tmp2.c_str(), "")) { /* path not found */
897          Dmsg3(dbglevel, "Path not found %lld q=%s s=%s\n",
898                id, tmp.c_str(), tmp2.c_str());
899          break;
900       }
901       /* escape % and _ for LIKE search */
902       tmp.check_size((strlen(tmp2.c_str())+1) * 2);
903       char *p = tmp.c_str();
904       for (char *s = tmp2.c_str(); *s ; s++) {
905          if (*s == '%' || *s == '_' || *s == '\\') {
906             *p = '\\'; 
907             p++;
908          }
909          *p = *s; 
910          p++;
911       }
912       *p = '\0';
913       tmp.strcat("%");
914
915       size_t len = strlen(tmp.c_str());
916       tmp2.check_size((len+1) * 2);
917       db_escape_string(jcr, db, tmp2.c_str(), tmp.c_str(), len);
918
919       if (init) {
920          query.strcat(" UNION ");
921       }
922
923       Mmsg(tmp, "SELECT JobId, JobTDate, File.FileIndex, File.FilenameId, "
924                         "File.PathId, FileId "
925                    "FROM Path JOIN File USING (PathId) JOIN Job USING (JobId) "
926                   "WHERE Path.Path LIKE '%s' AND File.JobId IN (%s) ", 
927            tmp2.c_str(), jobids); 
928       query.strcat(tmp.c_str());
929       init = true;
930
931       query.strcat(" UNION ");
932
933       /* A directory can have files from a BaseJob */
934       Mmsg(tmp, "SELECT File.JobId, JobTDate, BaseFiles.FileIndex, "
935                         "File.FilenameId, File.PathId, BaseFiles.FileId "
936                    "FROM BaseFiles "
937                         "JOIN File USING (FileId) "
938                         "JOIN Job ON (BaseFiles.JobId = Job.JobId) "
939                         "JOIN Path USING (PathId) "
940                   "WHERE Path.Path LIKE '%s' AND BaseFiles.JobId IN (%s) ", 
941            tmp2.c_str(), jobids); 
942       query.strcat(tmp.c_str());
943    }
944
945    /* expect jobid,fileindex */
946    int64_t prev_jobid=0;
947    while (get_next_id_from_list(&hardlink, &jobid) == 1) {
948       if (get_next_id_from_list(&hardlink, &id) != 1) {
949          Dmsg0(dbglevel, "hardlink should be two by two\n");
950          return false;
951       }
952       if (jobid != prev_jobid) { /* new job */
953          if (prev_jobid == 0) {  /* first jobid */
954             if (init) {
955                query.strcat(" UNION ");
956             }
957          } else {               /* end last job, start new one */
958             tmp.strcat(") UNION ");
959             query.strcat(tmp.c_str());
960          }
961          Mmsg(tmp,   "SELECT JobId, JobTDate, FileIndex, FilenameId, "
962                             "PathId, FileId "
963                        "FROM File JOIN Job USING (JobId) WHERE JobId = %lld " 
964                         "AND FileIndex IN (%lld", jobid, id);
965          prev_jobid = jobid;
966
967       } else {                  /* same job, add new findex */
968          Mmsg(tmp2, ", %lld", id);
969          tmp.strcat(tmp2.c_str());
970       }
971    }
972
973    if (prev_jobid != 0) {       /* end last job */
974       tmp.strcat(") ");
975       query.strcat(tmp.c_str());
976       init = true;
977    }
978
979    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
980
981    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
982       Dmsg0(dbglevel, "Can't execute q\n");
983       goto bail_out;
984    }
985
986    /* TODO: handle basejob and SQLite3 */
987    Mmsg(query, sql_bvfs_select[db_get_type_index(db)], output_table, output_table);
988
989    /* TODO: handle jobid filter */
990    Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
991    if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
992       Dmsg0(dbglevel, "Can't execute q\n");
993       goto bail_out;
994    }
995
996    /* MySQL need it */
997    if (db_get_type_index(db) == SQL_TYPE_MYSQL) {
998       Mmsg(query, "CREATE INDEX idx_%s ON b2%s (JobId)", 
999            output_table, output_table);
1000       Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
1001       if (!db_sql_query(db, query.c_str(), NULL, NULL)) {
1002          Dmsg0(dbglevel, "Can't execute q\n");
1003          goto bail_out;
1004       }
1005    }
1006
1007    ret = true;
1008
1009 bail_out:
1010    Mmsg(query, "DROP TABLE btemp%s", output_table);
1011    db_sql_query(db, query.c_str(), NULL, NULL);
1012    return ret;
1013 }
1014
1015 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI */