2 Bacula® - The Network Backup Solution
4 Copyright (C) 2009-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation, which is
11 listed in the file LICENSE.
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 #define __SQL_C /* indicate that this is sql.c */
32 #include "cats/cats.h"
33 #include "lib/htable.h"
37 #define dbglevel_sql 15
39 static int result_handler(void *ctx, int fields, char **row)
42 Pmsg4(0, "%s\t%s\t%s\t%s\n",
43 row[0], row[1], row[2], row[3]);
44 } else if (fields == 5) {
45 Pmsg5(0, "%s\t%s\t%s\t%s\t%s\n",
46 row[0], row[1], row[2], row[3], row[4]);
47 } else if (fields == 6) {
48 Pmsg6(0, "%s\t%s\t%s\t%s\t%s\t%s\n",
49 row[0], row[1], row[2], row[3], row[4], row[5]);
50 } else if (fields == 7) {
51 Pmsg7(0, "%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
52 row[0], row[1], row[2], row[3], row[4], row[5], row[6]);
57 Bvfs::Bvfs(JCR *j, B_DB *mdb) {
60 db = mdb; /* need to inc ref count */
61 jobids = get_pool_memory(PM_NAME);
62 prev_dir = get_pool_memory(PM_NAME);
63 pattern = get_pool_memory(PM_NAME);
64 *jobids = *prev_dir = *pattern = 0;
65 dir_filenameid = pwd_id = offset = 0;
66 see_copies = see_all_version = false;
69 list_entries = result_handler;
74 free_pool_memory(jobids);
75 free_pool_memory(pattern);
76 free_pool_memory(prev_dir);
82 * TODO: Find a way to let the user choose how he wants to display
83 * files and directories
88 * Working Object to store PathId already seen (avoid
89 * database queries), equivalent to %cache_ppathid in perl
98 htable *cache_ppathid;
103 cache_ppathid = (htable *)malloc(sizeof(htable));
104 cache_ppathid->init(&link, &link, NITEMS);
106 nodes = (hlink *) malloc(max_node * sizeof (hlink));
111 if (nb_node >= max_node) {
113 nodes = (hlink *)brealloc(nodes, sizeof(hlink) * max_node);
115 return nodes + nb_node++;
118 bool lookup(char *pathid) {
119 bool ret = cache_ppathid->lookup(pathid) != NULL;
123 void insert(char *pathid) {
124 hlink *h = get_hlink();
125 cache_ppathid->insert(pathid, h);
129 cache_ppathid->destroy();
134 pathid_cache(const pathid_cache &); /* prohibit pass by value */
135 pathid_cache &operator= (const pathid_cache &);/* prohibit class assignment*/
138 /* Return the parent_dir with the trailing / (update the given string)
139 * TODO: see in the rest of bacula if we don't have already this function
145 char *bvfs_parent_dir(char *path)
148 int len = strlen(path) - 1;
150 if (len >= 0 && path[len] == '/') { /* if directory, skip last / */
156 while (p > path && !IsPathSeparator(*p)) {
164 /* Return the basename of the with the trailing /
165 * TODO: see in the rest of bacula if we don't have
166 * this function already
168 char *bvfs_basename_dir(char *path)
171 int len = strlen(path) - 1;
173 if (path[len] == '/') { /* if directory, skip last / */
179 while (p > path && !IsPathSeparator(*p)) {
183 p++; /* skip first / */
189 static void build_path_hierarchy(JCR *jcr, B_DB *mdb,
190 pathid_cache &ppathid_cache,
191 char *org_pathid, char *path)
193 Dmsg1(dbglevel, "build_path_hierarchy(%s)\n", path);
196 char *bkp = mdb->path;
197 strncpy(pathid, org_pathid, sizeof(pathid));
199 /* Does the ppathid exist for this ? we use a memory cache... In order to
200 * avoid the full loop, we consider that if a dir is allready in the
201 * PathHierarchy table, then there is no need to calculate all the
204 while (path && *path)
206 if (!ppathid_cache.lookup(pathid))
209 "SELECT PPathId FROM PathHierarchy WHERE PathId = %s",
212 QUERY_DB(jcr, mdb, mdb->cmd);
213 /* Do we have a result ? */
214 if (sql_num_rows(mdb) > 0) {
215 ppathid_cache.insert(pathid);
216 /* This dir was in the db ...
217 * It means we can leave, the tree has allready been built for
222 /* search or create parent PathId in Path table */
223 mdb->path = bvfs_parent_dir(path);
224 mdb->pnl = strlen(mdb->path);
225 if (!db_create_path_record(jcr, mdb, &parent)) {
228 ppathid_cache.insert(pathid);
231 "INSERT INTO PathHierarchy (PathId, PPathId) "
233 pathid, (uint64_t) parent.PathId);
235 INSERT_DB(jcr, mdb, mdb->cmd);
237 edit_uint64(parent.PathId, pathid);
238 path = mdb->path; /* already done */
241 /* It's already in the cache. We can leave, no time to waste here,
242 * all the parent dirs have allready been done
254 * Internal function to update path_hierarchy cache with a shared pathid cache
256 static void update_path_hierarchy_cache(JCR *jcr,
258 pathid_cache &ppathid_cache,
261 Dmsg0(dbglevel, "update_path_hierarchy_cache()\n");
265 edit_uint64(JobId, jobid);
268 db_start_transaction(jcr, mdb);
270 Mmsg(mdb->cmd, "SELECT 1 FROM Job WHERE JobId = %s AND HasCache=1", jobid);
272 if (!QUERY_DB(jcr, mdb, mdb->cmd) || sql_num_rows(mdb) > 0) {
273 Dmsg1(dbglevel, "already computed %d\n", (uint32_t)JobId );
277 /* Inserting path records for JobId */
278 Mmsg(mdb->cmd, "INSERT INTO PathVisibility (PathId, JobId) "
279 "SELECT DISTINCT PathId, JobId FROM File WHERE JobId = %s",
281 QUERY_DB(jcr, mdb, mdb->cmd);
284 /* Now we have to do the directory recursion stuff to determine missing
285 * visibility We try to avoid recursion, to be as fast as possible We also
286 * only work on not allready hierarchised directories...
289 "SELECT PathVisibility.PathId, Path "
290 "FROM PathVisibility "
291 "JOIN Path ON( PathVisibility.PathId = Path.PathId) "
292 "LEFT JOIN PathHierarchy "
293 "ON (PathVisibility.PathId = PathHierarchy.PathId) "
294 "WHERE PathVisibility.JobId = %s "
295 "AND PathHierarchy.PathId IS NULL "
296 "ORDER BY Path", jobid);
297 Dmsg1(dbglevel_sql, "q=%s\n", mdb->cmd);
298 QUERY_DB(jcr, mdb, mdb->cmd);
300 /* TODO: I need to reuse the DB connection without emptying the result
301 * So, now i'm copying the result in memory to be able to query the
302 * catalog descriptor again.
304 num = sql_num_rows(mdb);
306 char **result = (char **)malloc (num * 2 * sizeof(char *));
310 while((row = sql_fetch_row(mdb))) {
311 result[i++] = bstrdup(row[0]);
312 result[i++] = bstrdup(row[1]);
317 build_path_hierarchy(jcr, mdb, ppathid_cache, result[i], result[i+1]);
326 "INSERT INTO PathVisibility (PathId, JobId) "
327 "SELECT a.PathId,%s "
329 "SELECT DISTINCT h.PPathId AS PathId "
330 "FROM PathHierarchy AS h "
331 "JOIN PathVisibility AS p ON (h.PathId=p.PathId) "
332 "WHERE p.JobId=%s) AS a LEFT JOIN "
334 "FROM PathVisibility "
335 "WHERE JobId=%s) AS b ON (a.PathId = b.PathId) "
336 "WHERE b.PathId IS NULL", jobid, jobid, jobid);
339 QUERY_DB(jcr, mdb, mdb->cmd);
340 } while (sql_affected_rows(mdb) > 0);
342 Mmsg(mdb->cmd, "UPDATE Job SET HasCache=1 WHERE JobId=%s", jobid);
343 UPDATE_DB(jcr, mdb, mdb->cmd);
346 db_end_transaction(jcr, mdb);
351 * Find an store the filename descriptor for empty directories Filename.Name=''
353 DBId_t Bvfs::get_dir_filenameid()
356 if (dir_filenameid) {
357 return dir_filenameid;
360 Mmsg(q, "SELECT FilenameId FROM Filename WHERE Name = ''");
361 db_sql_query(db, q.c_str(), db_int_handler, &id);
363 return dir_filenameid;
366 void bvfs_update_cache(JCR *jcr, B_DB *mdb)
369 db_list_ctx jobids_list;
372 db_start_transaction(jcr, mdb);
374 Mmsg(mdb->cmd, "SELECT 1 from PathHierarchy LIMIT 1");
375 /* TODO: Add this code in the make_bacula_table script */
376 if (!QUERY_DB(jcr, mdb, mdb->cmd)) {
377 Dmsg0(dbglevel, "Creating cache table\n");
378 Mmsg(mdb->cmd, "ALTER TABLE Job ADD HasCache int DEFAULT 0");
379 QUERY_DB(jcr, mdb, mdb->cmd);
382 "CREATE TABLE PathHierarchy ( "
383 "PathId integer NOT NULL, "
384 "PPathId integer NOT NULL, "
385 "CONSTRAINT pathhierarchy_pkey "
386 "PRIMARY KEY (PathId))");
387 QUERY_DB(jcr, mdb, mdb->cmd);
390 "CREATE INDEX pathhierarchy_ppathid "
391 "ON PathHierarchy (PPathId)");
392 QUERY_DB(jcr, mdb, mdb->cmd);
395 "CREATE TABLE PathVisibility ("
396 "PathId integer NOT NULL, "
397 "JobId integer NOT NULL, "
398 "Size int8 DEFAULT 0, "
399 "Files int4 DEFAULT 0, "
400 "CONSTRAINT pathvisibility_pkey "
401 "PRIMARY KEY (JobId, PathId))");
402 QUERY_DB(jcr, mdb, mdb->cmd);
405 "CREATE INDEX pathvisibility_jobid "
406 "ON PathVisibility (JobId)");
407 QUERY_DB(jcr, mdb, mdb->cmd);
412 "SELECT JobId from Job "
413 "WHERE HashCache = 0 "
414 "AND Type IN ('B') AND JobStatus IN ('T', 'f', 'A') "
417 db_sql_query(mdb, mdb->cmd, db_list_handler, &jobids_list);
419 bvfs_update_path_hierarchy_cache(jcr, mdb, jobids_list.list);
421 db_end_transaction(jcr, mdb);
422 db_start_transaction(jcr, mdb);
423 Dmsg0(dbglevel, "Cleaning pathvisibility\n");
425 "DELETE FROM PathVisibility "
427 "(SELECT 1 FROM Job WHERE JobId=PathVisibility.JobId)");
428 nb = DELETE_DB(jcr, mdb, mdb->cmd);
429 Dmsg1(dbglevel, "Affected row(s) = %d\n", nb);
431 db_end_transaction(jcr, mdb);
435 * Update the bvfs cache for given jobids (1,2,3,4)
438 bvfs_update_path_hierarchy_cache(JCR *jcr, B_DB *mdb, char *jobids)
440 pathid_cache ppathid_cache;
445 int stat = get_next_jobid_from_list(&p, &JobId);
452 Dmsg1(dbglevel, "Updating cache for %lld\n", (uint64_t)JobId);
453 update_path_hierarchy_cache(jcr, mdb, ppathid_cache, JobId);
458 * Update the bvfs cache for current jobids
460 void Bvfs::update_cache()
462 bvfs_update_path_hierarchy_cache(jcr, db, jobids);
465 /* Change the current directory, returns true if the path exists */
466 bool Bvfs::ch_dir(const char *path)
468 pm_strcpy(db->path, path);
469 db->pnl = strlen(db->path);
470 ch_dir(db_get_path_record(jcr, db));
475 * Get all file versions for a specified client
477 void Bvfs::get_all_file_versions(DBId_t pathid, DBId_t fnid, const char *client)
479 Dmsg3(dbglevel, "get_all_file_versions(%lld, %lld, %s)\n", (uint64_t)pathid,
480 (uint64_t)fnid, client);
481 char ed1[50], ed2[50];
484 Mmsg(q, " AND Job.Type IN ('C', 'B') ");
486 Mmsg(q, " AND Job.Type = 'B' ");
491 Mmsg(query,// 1 2 3 4
492 "SELECT 'V', File.FileId, File.Md5, File.JobId, File.LStat, "
494 "Media.VolumeName, Media.InChanger "
495 "FROM File, Job, Client, JobMedia, Media "
496 "WHERE File.FilenameId = %s "
497 "AND File.PathId=%s "
498 "AND File.JobId = Job.JobId "
499 "AND Job.ClientId = Client.ClientId "
500 "AND Job.JobId = JobMedia.JobId "
501 "AND File.FileIndex >= JobMedia.FirstIndex "
502 "AND File.FileIndex <= JobMedia.LastIndex "
503 "AND JobMedia.MediaId = Media.MediaId "
504 "AND Client.Name = '%s' "
505 "%s ORDER BY FileId LIMIT %d OFFSET %d"
506 ,edit_uint64(fnid, ed1), edit_uint64(pathid, ed2), client, q.c_str(),
508 Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
509 db_sql_query(db, query.c_str(), list_entries, user_data);
512 DBId_t Bvfs::get_root()
515 return db_get_path_record(jcr, db);
518 static int path_handler(void *ctx, int fields, char **row)
520 Bvfs *fs = (Bvfs *) ctx;
521 return fs->_handle_path(ctx, fields, row);
524 int Bvfs::_handle_path(void *ctx, int fields, char **row)
526 if (bvfs_is_dir(row)) {
527 /* can have the same path 2 times */
528 if (strcmp(row[BVFS_Name], prev_dir)) {
529 pm_strcpy(prev_dir, row[BVFS_Name]);
530 return list_entries(user_data, fields, row);
537 * Retrieve . and .. information
539 void Bvfs::ls_special_dirs()
541 Dmsg1(dbglevel, "ls_special_dirs(%lld)\n", (uint64_t)pwd_id);
542 char ed1[50], ed2[50];
546 if (!dir_filenameid) {
547 get_dir_filenameid();
550 /* Will fetch directories */
555 "((SELECT PPathId AS PathId, '..' AS Path "
556 "FROM PathHierarchy "
557 "WHERE PathId = %s) "
559 "(SELECT %s AS PathId, '.' AS Path))",
560 edit_uint64(pwd_id, ed1), ed1);
563 Mmsg(query2,// 1 2 3 4 5 6
564 "SELECT 'D', tmp.PathId, 0, tmp.Path, JobId, LStat, FileId "
565 "FROM %s AS tmp LEFT JOIN ( " // get attributes if any
566 "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
567 "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
568 "WHERE File1.FilenameId = %s "
569 "AND File1.JobId IN (%s)) AS listfile1 "
570 "ON (tmp.PathId = listfile1.PathId) "
571 "ORDER BY tmp.Path, JobId DESC ",
572 query.c_str(), edit_uint64(dir_filenameid, ed2), jobids);
574 Dmsg1(dbglevel_sql, "q=%s\n", query2.c_str());
575 db_sql_query(db, query2.c_str(), path_handler, this);
578 /* Returns true if we have dirs to read */
581 Dmsg1(dbglevel, "ls_dirs(%lld)\n", (uint64_t)pwd_id);
582 char ed1[50], ed2[50];
589 Mmsg(filter, " AND Path2.Path %s '%s' ", SQL_MATCH, pattern);
592 if (!dir_filenameid) {
593 get_dir_filenameid();
596 /* the sql query displays same directory multiple time, take the first one */
599 /* Let's retrieve the list of the visible dirs in this dir ...
600 * First, I need the empty filenameid to locate efficiently
601 * the dirs in the file table
602 * my $dir_filenameid = $self->get_dir_filenameid();
604 /* Then we get all the dir entries from File ... */
608 "SELECT 'D', PathId, 0, Path, JobId, LStat, FileId FROM ( "
609 "SELECT Path1.PathId AS PathId, Path1.Path AS Path, "
610 "lower(Path1.Path) AS lpath, "
611 "listfile1.JobId AS JobId, listfile1.LStat AS LStat, "
612 "listfile1.FileId AS FileId "
614 "SELECT DISTINCT PathHierarchy1.PathId AS PathId "
615 "FROM PathHierarchy AS PathHierarchy1 "
616 "JOIN Path AS Path2 "
617 "ON (PathHierarchy1.PathId = Path2.PathId) "
618 "JOIN PathVisibility AS PathVisibility1 "
619 "ON (PathHierarchy1.PathId = PathVisibility1.PathId) "
620 "WHERE PathHierarchy1.PPathId = %s "
621 "AND PathVisibility1.jobid IN (%s) "
624 "JOIN Path AS Path1 ON (listpath1.PathId = Path1.PathId) "
626 "LEFT JOIN ( " /* get attributes if any */
627 "SELECT File1.PathId AS PathId, File1.JobId AS JobId, "
628 "File1.LStat AS LStat, File1.FileId AS FileId FROM File AS File1 "
629 "WHERE File1.FilenameId = %s "
630 "AND File1.JobId IN (%s)) AS listfile1 "
631 "ON (listpath1.PathId = listfile1.PathId) "
632 ") AS A ORDER BY 2,3 DESC LIMIT %d OFFSET %d",
633 edit_uint64(pwd_id, ed1),
636 edit_uint64(dir_filenameid, ed2),
640 Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
643 db_sql_query(db, query.c_str(), path_handler, this);
644 nb_record = db->num_rows;
647 return nb_record == limit;
650 /* Returns true if we have files to read */
651 bool Bvfs::ls_files()
653 Dmsg1(dbglevel, "ls_files(%lld)\n", (uint64_t)pwd_id);
665 Mmsg(filter, " AND Filename.Name %s '%s' ", SQL_MATCH, pattern);
669 Mmsg(query, // 1 2 3 4
670 "SELECT 'F', File.PathId, File.FilenameId, listfiles.Name, File.JobId, "
671 "File.LStat, listfiles.id "
673 "SELECT Filename.Name as Name, max(File.FileId) as id "
674 "FROM File, Filename "
675 "WHERE File.FilenameId = Filename.FilenameId "
676 "AND Filename.Name != '' "
677 "AND File.PathId = %s "
678 "AND File.JobId IN (%s) "
680 "GROUP BY Filename.Name "
681 "ORDER BY Filename.Name LIMIT %d OFFSET %d "
683 "WHERE File.FileId = listfiles.id",
684 edit_uint64(pwd_id, ed1),
689 Dmsg1(dbglevel_sql, "q=%s\n", query.c_str());
692 db_sql_query(db, query.c_str(), list_entries, user_data);
693 nb_record = db->num_rows;
696 return nb_record == limit;