2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database interface routines
31 * Almost generic set of SQL database interface routines
32 * (with a little more work)
33 * SQL engine specific routines are in mysql.c, postgresql.c,
36 * Kern Sibbald, March 2000
38 * Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
41 /* The following is necessary so that we do not include
42 * the dummy external definition of B_DB.
44 #define __SQL_C /* indicate that this is sql.c */
49 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
51 uint32_t bacula_db_version = 0;
53 int db_type = -1; /* SQL engine type index */
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60 const char *db_password, const char *db_address, int db_port,
61 const char *db_socket, int mult_db_connections)
66 Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
68 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69 Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
71 p = (char *)(db_driver + 4);
72 if (strcasecmp(p, "mysql") == 0) {
73 db_type = SQL_TYPE_MYSQL;
74 } else if (strcasecmp(p, "postgresql") == 0) {
75 db_type = SQL_TYPE_POSTGRESQL;
76 } else if (strcasecmp(p, "sqlite") == 0) {
77 db_type = SQL_TYPE_SQLITE;
78 } else if (strcasecmp(p, "sqlite3") == 0) {
79 db_type = SQL_TYPE_SQLITE3;
80 } else if (strcasecmp(p, "ingres") == 0) {
81 db_type = SQL_TYPE_INGRES;
83 Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
86 db_type = SQL_TYPE_MYSQL;
88 db_type = SQL_TYPE_POSTGRESQL;
90 db_type = SQL_TYPE_INGRES;
92 db_type = SQL_TYPE_SQLITE;
94 db_type = SQL_TYPE_SQLITE3;
97 return db_init_database(jcr, db_name, db_user, db_password, db_address,
98 db_port, db_socket, mult_db_connections);
101 dbid_list::dbid_list()
103 memset(this, 0, sizeof(dbid_list));
105 DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
106 num_ids = num_seen = tot_ids = 0;
110 dbid_list::~dbid_list()
116 * Called here to retrieve an integer from the database
118 int db_int_handler(void *ctx, int num_fields, char **row)
120 uint32_t *val = (uint32_t *)ctx;
122 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
125 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
126 *val = str_to_int64(row[0]);
128 Dmsg0(800, "int_handler finds zero\n");
131 Dmsg0(800, "int_handler finishes\n");
136 * Called here to retrieve a 32/64 bit integer from the database.
137 * The returned integer will be extended to 64 bit.
139 int db_int64_handler(void *ctx, int num_fields, char **row)
141 db_int64_ctx *lctx = (db_int64_ctx *)ctx;
144 lctx->value = str_to_int64(row[0]);
151 * Use to build a comma separated list of values from a query. "10,20,30"
153 int db_list_handler(void *ctx, int num_fields, char **row)
155 db_list_ctx *lctx = (db_list_ctx *)ctx;
156 if (num_fields == 1 && row[0]) {
158 pm_strcat(lctx->list, ",");
160 pm_strcat(lctx->list, row[0]);
168 * Called here to retrieve an integer from the database
170 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
172 uint32_t *val = (uint32_t *)ctx;
173 uint32_t index = sql_get_max_connections_index[db_type];
175 *val = str_to_int64(row[index]);
177 Dmsg0(800, "int_handler finds zero\n");
184 * Check catalog max_connections setting
186 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
188 #ifdef HAVE_BATCH_FILE_INSERT
190 uint32_t max_conn = 0;
192 /* With Batch insert, verify max_connections */
193 if (!db_sql_query(mdb, sql_get_max_connections[db_type],
194 db_max_connections_handler, &max_conn)) {
195 Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
198 if (max_conn && max_concurrent_jobs > max_conn) {
200 _("Potential performance problem:\n"
201 "max_connections=%d set for %s database \"%s\" should be larger than Director's "
202 "MaxConcurrentJobs=%d\n"),
203 max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
204 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
213 /* NOTE!!! The following routines expect that the
214 * calling subroutine sets and clears the mutex
217 /* Check that the tables correspond to the version we want */
218 bool check_tables_version(JCR *jcr, B_DB *mdb)
220 const char *query = "SELECT VersionId FROM Version";
222 bacula_db_version = 0;
223 if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
224 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
227 if (bacula_db_version != BDB_VERSION) {
228 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
229 mdb->db_name, BDB_VERSION, bacula_db_version);
230 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
236 /* Utility routine for queries. The database MUST be locked before calling here. */
238 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
242 sql_free_result(mdb);
243 if ((status=sql_query(mdb, cmd)) != 0) {
244 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
245 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
247 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
252 mdb->result = sql_store_result(mdb);
254 return mdb->result != NULL;
258 * Utility routine to do inserts
259 * Returns: 0 on failure
263 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
265 if (sql_query(mdb, cmd)) {
266 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
267 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
269 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
273 if (mdb->have_insert_id) {
274 mdb->num_rows = sql_affected_rows(mdb);
278 if (mdb->num_rows != 1) {
280 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
281 edit_uint64(mdb->num_rows, ed1));
283 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
291 /* Utility routine for updates.
292 * Returns: 0 on failure
296 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
299 if (sql_query(mdb, cmd)) {
300 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
301 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
303 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
307 mdb->num_rows = sql_affected_rows(mdb);
308 if (mdb->num_rows < 1) {
310 m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
311 edit_uint64(mdb->num_rows, ed1), cmd);
313 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
321 /* Utility routine for deletes
323 * Returns: -1 on error
324 * n number of rows affected
327 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
330 if (sql_query(mdb, cmd)) {
331 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
332 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
334 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
339 return sql_affected_rows(mdb);
344 * Get record max. Query is already in mdb->cmd
347 * Returns: -1 on failure
350 int get_sql_record_max(JCR *jcr, B_DB *mdb)
355 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
356 if ((row = sql_fetch_row(mdb)) == NULL) {
357 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
360 stat = str_to_int64(row[0]);
362 sql_free_result(mdb);
364 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
371 * Return pre-edited error message
373 char *db_strerror(B_DB *mdb)
379 * Lock database, this can be called multiple times by the same
380 * thread without blocking, but must be unlocked the number of
381 * times it was locked.
383 void _db_lock(const char *file, int line, B_DB *mdb)
386 if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
388 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
389 errstat, be.bstrerror(errstat));
394 * Unlock the database. This can be called multiple times by the
395 * same thread up to the number of times that thread called
398 void _db_unlock(const char *file, int line, B_DB *mdb)
401 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
403 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
404 errstat, be.bstrerror(errstat));
409 * Start a transaction. This groups inserts and makes things
410 * much more efficient. Usually started when inserting
413 void db_start_transaction(JCR *jcr, B_DB *mdb)
416 jcr->attr = get_pool_memory(PM_FNAME);
419 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
423 if (!mdb->allow_transactions) {
427 /* Allow only 10,000 changes per transaction */
428 if (mdb->transaction && mdb->changes > 10000) {
429 db_end_transaction(jcr, mdb);
431 if (!mdb->transaction) {
432 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
433 Dmsg0(400, "Start SQLite transaction\n");
434 mdb->transaction = 1;
440 * This is turned off because transactions break
441 * if multiple simultaneous jobs are run.
443 #ifdef HAVE_POSTGRESQL
444 if (!mdb->allow_transactions) {
448 /* Allow only 25,000 changes per transaction */
449 if (mdb->transaction && mdb->changes > 25000) {
450 db_end_transaction(jcr, mdb);
452 if (!mdb->transaction) {
453 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
454 Dmsg0(400, "Start PosgreSQL transaction\n");
455 mdb->transaction = 1;
461 if (!mdb->allow_transactions) {
465 /* Allow only 25,000 changes per transaction */
466 if (mdb->transaction && mdb->changes > 25000) {
467 db_end_transaction(jcr, mdb);
469 if (!mdb->transaction) {
470 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
471 Dmsg0(400, "Start Ingres transaction\n");
472 mdb->transaction = 1;
478 if (db_type == SQL_TYPE_SQLITE) {
479 if (!mdb->allow_transactions) {
483 /* Allow only 10,000 changes per transaction */
484 if (mdb->transaction && mdb->changes > 10000) {
485 db_end_transaction(jcr, mdb);
487 if (!mdb->transaction) {
488 //my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
489 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
490 Dmsg0(400, "Start SQLite transaction\n");
491 mdb->transaction = 1;
494 } else if (db_type == SQL_TYPE_POSTGRESQL) {
495 if (!mdb->allow_transactions) {
499 /* Allow only 25,000 changes per transaction */
500 if (mdb->transaction && mdb->changes > 25000) {
501 db_end_transaction(jcr, mdb);
503 if (!mdb->transaction) {
504 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
505 Dmsg0(400, "Start PosgreSQL transaction\n");
506 mdb->transaction = 1;
513 void db_end_transaction(JCR *jcr, B_DB *mdb)
516 * This can be called during thread cleanup and
517 * the db may already be closed. So simply return.
523 if (jcr && jcr->cached_attribute) {
524 Dmsg0(400, "Flush last cached attribute.\n");
525 if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
526 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
528 jcr->cached_attribute = false;
532 if (!mdb->allow_transactions) {
536 if (mdb->transaction) {
537 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
538 mdb->transaction = 0;
539 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
548 if (!mdb->allow_transactions) {
552 if (mdb->transaction) {
553 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
554 mdb->transaction = 0;
555 Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
562 #ifdef HAVE_POSTGRESQL
563 if (!mdb->allow_transactions) {
567 if (mdb->transaction) {
568 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
569 mdb->transaction = 0;
570 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
577 if (db_type == SQL_TYPE_SQLITE) {
578 if (!mdb->allow_transactions) {
582 if (mdb->transaction) {
583 //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
584 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
585 mdb->transaction = 0;
586 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
590 } else if (db_type == SQL_TYPE_POSTGRESQL) {
591 if (!mdb->allow_transactions) {
595 if (mdb->transaction) {
596 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
597 mdb->transaction = 0;
598 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
607 * Given a full filename, split it into its path
608 * and filename parts. They are returned in pool memory
609 * in the mdb structure.
611 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
615 /* Find path without the filename.
616 * I.e. everything after the last / is a "filename".
617 * OK, maybe it is a directory name, but we treat it like
618 * a filename. If we don't find a / then the whole name
619 * must be a path name (e.g. c:).
621 for (p=f=fname; *p; p++) {
622 if (IsPathSeparator(*p)) {
623 f = p; /* set pos of last slash */
626 if (IsPathSeparator(*f)) { /* did we find a slash? */
627 f++; /* yes, point to filename */
628 } else { /* no, whole thing must be path name */
632 /* If filename doesn't exist (i.e. root directory), we
633 * simply create a blank name consisting of a single
634 * space. This makes handling zero length filenames
639 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
640 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
641 mdb->fname[mdb->fnl] = 0;
647 mdb->pnl = f - fname;
649 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
650 memcpy(mdb->path, fname, mdb->pnl);
651 mdb->path[mdb->pnl] = 0;
653 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
654 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
659 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
663 * Set maximum field length to something reasonable
665 static int max_length(int max_length)
667 int max_len = max_length;
671 } else if (max_len > 100) {
678 * List dashes as part of header for listing SQL results in a table
681 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
687 sql_field_seek(mdb, 0);
689 for (i = 0; i < sql_num_fields(mdb); i++) {
690 field = sql_fetch_field(mdb);
694 len = max_length(field->max_length + 2);
695 for (j = 0; j < len; j++) {
704 * If full_list is set, we list vertically, otherwise, we
705 * list on one line horizontally.
708 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
712 int i, col_len, max_len = 0;
713 char buf[2000], ewc[30];
715 Dmsg0(800, "list_result starts\n");
716 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
717 send(ctx, _("No results to list.\n"));
721 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
722 /* determine column display widths */
723 sql_field_seek(mdb, 0);
724 for (i = 0; i < sql_num_fields(mdb); i++) {
725 Dmsg1(800, "list_result processing field %d\n", i);
726 field = sql_fetch_field(mdb);
730 col_len = cstrlen(field->name);
731 if (type == VERT_LIST) {
732 if (col_len > max_len) {
736 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
737 field->max_length += (field->max_length - 1) / 3;
739 if (col_len < (int)field->max_length) {
740 col_len = field->max_length;
742 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
743 col_len = 4; /* 4 = length of the word "NULL" */
745 field->max_length = col_len; /* reset column info */
749 Dmsg0(800, "list_result finished first loop\n");
750 if (type == VERT_LIST) {
754 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
755 list_dashes(mdb, send, ctx);
757 sql_field_seek(mdb, 0);
758 for (i = 0; i < sql_num_fields(mdb); i++) {
759 Dmsg1(800, "list_result looking at field %d\n", i);
760 field = sql_fetch_field(mdb);
764 max_len = max_length(field->max_length);
765 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
769 list_dashes(mdb, send, ctx);
771 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
772 while ((row = sql_fetch_row(mdb)) != NULL) {
773 sql_field_seek(mdb, 0);
775 for (i = 0; i < sql_num_fields(mdb); i++) {
776 field = sql_fetch_field(mdb);
780 max_len = max_length(field->max_length);
781 if (row[i] == NULL) {
782 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
783 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
784 bsnprintf(buf, sizeof(buf), " %*s |", max_len,
785 add_commas(row[i], ewc));
787 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
793 list_dashes(mdb, send, ctx);
798 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
799 while ((row = sql_fetch_row(mdb)) != NULL) {
800 sql_field_seek(mdb, 0);
801 for (i = 0; i < sql_num_fields(mdb); i++) {
802 field = sql_fetch_field(mdb);
806 if (row[i] == NULL) {
807 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
808 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
809 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
810 add_commas(row[i], ewc));
812 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
822 * Open a new connexion to mdb catalog. This function is used
823 * by batch and accurate mode.
825 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
827 #ifdef HAVE_BATCH_FILE_INSERT
828 const int multi_db = true; /* we force a new connection only if batch insert is enabled */
830 const int multi_db = false;
833 if (!jcr->db_batch) {
834 jcr->db_batch = db_init_database(jcr,
841 multi_db /* multi_db = true when using batch mode */);
842 if (!jcr->db_batch) {
843 Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
844 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
848 if (!db_open_database(jcr, jcr->db_batch)) {
849 Mmsg2(&mdb->errmsg, _("Could not open database \"%s\": ERR=%s\n"),
850 jcr->db_batch->db_name, db_strerror(jcr->db_batch));
851 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
854 Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
855 jcr->db_batch->connected, jcr->db_batch->db);
862 * !!! WARNING !!! Use this function only when bacula is stopped.
863 * ie, after a fatal signal and before exiting the program
864 * Print information about a B_DB object.
866 void db_debug_print(JCR *jcr, FILE *fp)
874 fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
875 mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
876 fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
877 if (mdb->lock.valid == RWLOCK_VALID) {
878 fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
882 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/