2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database interface routines
31 * Almost generic set of SQL database interface routines
32 * (with a little more work)
33 * SQL engine specific routines are in mysql.c, postgresql.c,
36 * Kern Sibbald, March 2000
38 * Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
41 /* The following is necessary so that we do not include
42 * the dummy external definition of B_DB.
44 #define __SQL_C /* indicate that this is sql.c */
49 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
51 uint32_t bacula_db_version = 0;
53 int db_type = -1; /* SQL engine type index */
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60 const char *db_password, const char *db_address, int db_port,
61 const char *db_socket, int mult_db_connections)
66 Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
68 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69 Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
71 p = (char *)(db_driver + 4);
72 if (strcasecmp(p, "mysql") == 0) {
73 db_type = SQL_TYPE_MYSQL;
74 } else if (strcasecmp(p, "postgresql") == 0) {
75 db_type = SQL_TYPE_POSTGRESQL;
76 } else if (strcasecmp(p, "sqlite") == 0) {
77 db_type = SQL_TYPE_SQLITE;
78 } else if (strcasecmp(p, "sqlite3") == 0) {
79 db_type = SQL_TYPE_SQLITE3;
80 } else if (strcasecmp(p, "ingres") == 0) {
81 db_type = SQL_TYPE_INGRES;
83 Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
86 db_type = SQL_TYPE_MYSQL;
88 db_type = SQL_TYPE_POSTGRESQL;
90 db_type = SQL_TYPE_INGRES;
92 db_type = SQL_TYPE_SQLITE;
94 db_type = SQL_TYPE_SQLITE3;
97 return db_init_database(jcr, db_name, db_user, db_password, db_address,
98 db_port, db_socket, mult_db_connections);
101 dbid_list::dbid_list()
103 memset(this, 0, sizeof(dbid_list));
105 DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
106 num_ids = num_seen = tot_ids = 0;
110 dbid_list::~dbid_list()
116 * Called here to retrieve an integer from the database
118 int db_int_handler(void *ctx, int num_fields, char **row)
120 uint32_t *val = (uint32_t *)ctx;
122 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
125 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
126 *val = str_to_int64(row[0]);
128 Dmsg0(800, "int_handler finds zero\n");
131 Dmsg0(800, "int_handler finishes\n");
136 * Called here to retrieve a 32/64 bit integer from the database.
137 * The returned integer will be extended to 64 bit.
139 int db_int64_handler(void *ctx, int num_fields, char **row)
141 db_int64_ctx *lctx = (db_int64_ctx *)ctx;
144 lctx->value = str_to_int64(row[0]);
151 * Use to build a comma separated list of values from a query. "10,20,30"
153 int db_list_handler(void *ctx, int num_fields, char **row)
155 db_list_ctx *lctx = (db_list_ctx *)ctx;
156 if (num_fields == 1 && row[0]) {
158 pm_strcat(lctx->list, ",");
160 pm_strcat(lctx->list, row[0]);
168 * Called here to retrieve an integer from the database
170 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
172 uint32_t *val = (uint32_t *)ctx;
173 uint32_t index = sql_get_max_connections_index[db_type];
175 *val = str_to_int64(row[index]);
177 Dmsg0(800, "int_handler finds zero\n");
184 * Check catalog max_connections setting
186 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
188 #ifdef HAVE_BATCH_FILE_INSERT
190 uint32_t max_conn = 0;
192 /* With Batch insert, verify max_connections */
193 if (!db_sql_query(mdb, sql_get_max_connections[db_type],
194 db_max_connections_handler, &max_conn)) {
195 Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
198 if (max_conn && max_concurrent_jobs > max_conn) {
200 _("Potential performance problem:\n"
201 "max_connections=%d set for %s database \"%s\" should be larger than Director's "
202 "MaxConcurrentJobs=%d\n"),
203 max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
204 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
213 /* NOTE!!! The following routines expect that the
214 * calling subroutine sets and clears the mutex
217 /* Check that the tables correspond to the version we want */
218 bool check_tables_version(JCR *jcr, B_DB *mdb)
220 const char *query = "SELECT VersionId FROM Version";
222 bacula_db_version = 0;
223 if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
224 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
227 if (bacula_db_version != BDB_VERSION) {
228 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
229 mdb->db_name, BDB_VERSION, bacula_db_version);
230 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
236 /* Utility routine for queries. The database MUST be locked before calling here. */
238 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
242 sql_free_result(mdb);
243 if ((status=sql_query(mdb, cmd)) != 0) {
244 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
245 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
247 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
252 mdb->result = sql_store_result(mdb);
254 return mdb->result != NULL;
258 * Utility routine to do inserts
259 * Returns: 0 on failure
263 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
265 if (sql_query(mdb, cmd)) {
266 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
267 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
269 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
273 mdb->num_rows = sql_affected_rows(mdb);
274 if (mdb->num_rows != 1) {
276 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
277 edit_uint64(mdb->num_rows, ed1));
279 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
287 /* Utility routine for updates.
288 * Returns: 0 on failure
292 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
295 if (sql_query(mdb, cmd)) {
296 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
297 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
299 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
303 mdb->num_rows = sql_affected_rows(mdb);
304 if (mdb->num_rows < 1) {
306 m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
307 edit_uint64(mdb->num_rows, ed1), cmd);
309 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
317 /* Utility routine for deletes
319 * Returns: -1 on error
320 * n number of rows affected
323 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
326 if (sql_query(mdb, cmd)) {
327 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
328 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
330 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
335 return sql_affected_rows(mdb);
340 * Get record max. Query is already in mdb->cmd
343 * Returns: -1 on failure
346 int get_sql_record_max(JCR *jcr, B_DB *mdb)
351 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
352 if ((row = sql_fetch_row(mdb)) == NULL) {
353 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
356 stat = str_to_int64(row[0]);
358 sql_free_result(mdb);
360 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
367 * Return pre-edited error message
369 char *db_strerror(B_DB *mdb)
375 * Lock database, this can be called multiple times by the same
376 * thread without blocking, but must be unlocked the number of
377 * times it was locked.
379 void _db_lock(const char *file, int line, B_DB *mdb)
382 if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
384 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
385 errstat, be.bstrerror(errstat));
390 * Unlock the database. This can be called multiple times by the
391 * same thread up to the number of times that thread called
394 void _db_unlock(const char *file, int line, B_DB *mdb)
397 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
399 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
400 errstat, be.bstrerror(errstat));
405 * Start a transaction. This groups inserts and makes things
406 * much more efficient. Usually started when inserting
409 void db_start_transaction(JCR *jcr, B_DB *mdb)
412 jcr->attr = get_pool_memory(PM_FNAME);
415 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
419 if (!mdb->allow_transactions) {
423 /* Allow only 10,000 changes per transaction */
424 if (mdb->transaction && mdb->changes > 10000) {
425 db_end_transaction(jcr, mdb);
427 if (!mdb->transaction) {
428 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
429 Dmsg0(400, "Start SQLite transaction\n");
430 mdb->transaction = 1;
436 * This is turned off because transactions break
437 * if multiple simultaneous jobs are run.
439 #ifdef HAVE_POSTGRESQL
440 if (!mdb->allow_transactions) {
444 /* Allow only 25,000 changes per transaction */
445 if (mdb->transaction && mdb->changes > 25000) {
446 db_end_transaction(jcr, mdb);
448 if (!mdb->transaction) {
449 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
450 Dmsg0(400, "Start PosgreSQL transaction\n");
451 mdb->transaction = 1;
457 if (!mdb->allow_transactions) {
461 /* Allow only 25,000 changes per transaction */
462 if (mdb->transaction && mdb->changes > 25000) {
463 db_end_transaction(jcr, mdb);
465 if (!mdb->transaction) {
466 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
467 Dmsg0(400, "Start Ingres transaction\n");
468 mdb->transaction = 1;
474 if (db_type == SQL_TYPE_SQLITE) {
475 if (!mdb->allow_transactions) {
479 /* Allow only 10,000 changes per transaction */
480 if (mdb->transaction && mdb->changes > 10000) {
481 db_end_transaction(jcr, mdb);
483 if (!mdb->transaction) {
484 //my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
485 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
486 Dmsg0(400, "Start SQLite transaction\n");
487 mdb->transaction = 1;
490 } else if (db_type == SQL_TYPE_POSTGRESQL) {
491 if (!mdb->allow_transactions) {
495 /* Allow only 25,000 changes per transaction */
496 if (mdb->transaction && mdb->changes > 25000) {
497 db_end_transaction(jcr, mdb);
499 if (!mdb->transaction) {
500 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
501 Dmsg0(400, "Start PosgreSQL transaction\n");
502 mdb->transaction = 1;
509 void db_end_transaction(JCR *jcr, B_DB *mdb)
512 * This can be called during thread cleanup and
513 * the db may already be closed. So simply return.
519 if (jcr && jcr->cached_attribute) {
520 Dmsg0(400, "Flush last cached attribute.\n");
521 if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
522 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
524 jcr->cached_attribute = false;
528 if (!mdb->allow_transactions) {
532 if (mdb->transaction) {
533 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
534 mdb->transaction = 0;
535 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
544 if (!mdb->allow_transactions) {
548 if (mdb->transaction) {
549 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
550 mdb->transaction = 0;
551 Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
558 #ifdef HAVE_POSTGRESQL
559 if (!mdb->allow_transactions) {
563 if (mdb->transaction) {
564 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
565 mdb->transaction = 0;
566 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
573 if (db_type == SQL_TYPE_SQLITE) {
574 if (!mdb->allow_transactions) {
578 if (mdb->transaction) {
579 //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
580 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
581 mdb->transaction = 0;
582 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
586 } else if (db_type == SQL_TYPE_POSTGRESQL) {
587 if (!mdb->allow_transactions) {
591 if (mdb->transaction) {
592 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
593 mdb->transaction = 0;
594 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
603 * Given a full filename, split it into its path
604 * and filename parts. They are returned in pool memory
605 * in the mdb structure.
607 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
611 /* Find path without the filename.
612 * I.e. everything after the last / is a "filename".
613 * OK, maybe it is a directory name, but we treat it like
614 * a filename. If we don't find a / then the whole name
615 * must be a path name (e.g. c:).
617 for (p=f=fname; *p; p++) {
618 if (IsPathSeparator(*p)) {
619 f = p; /* set pos of last slash */
622 if (IsPathSeparator(*f)) { /* did we find a slash? */
623 f++; /* yes, point to filename */
624 } else { /* no, whole thing must be path name */
628 /* If filename doesn't exist (i.e. root directory), we
629 * simply create a blank name consisting of a single
630 * space. This makes handling zero length filenames
635 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
636 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
637 mdb->fname[mdb->fnl] = 0;
643 mdb->pnl = f - fname;
645 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
646 memcpy(mdb->path, fname, mdb->pnl);
647 mdb->path[mdb->pnl] = 0;
649 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
650 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
655 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
659 * Set maximum field length to something reasonable
661 static int max_length(int max_length)
663 int max_len = max_length;
667 } else if (max_len > 100) {
674 * List dashes as part of header for listing SQL results in a table
677 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
683 sql_field_seek(mdb, 0);
685 for (i = 0; i < sql_num_fields(mdb); i++) {
686 field = sql_fetch_field(mdb);
690 len = max_length(field->max_length + 2);
691 for (j = 0; j < len; j++) {
700 * If full_list is set, we list vertically, otherwise, we
701 * list on one line horizontally.
704 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
708 int i, col_len, max_len = 0;
709 char buf[2000], ewc[30];
711 Dmsg0(800, "list_result starts\n");
712 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
713 send(ctx, _("No results to list.\n"));
717 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
718 /* determine column display widths */
719 sql_field_seek(mdb, 0);
720 for (i = 0; i < sql_num_fields(mdb); i++) {
721 Dmsg1(800, "list_result processing field %d\n", i);
722 field = sql_fetch_field(mdb);
726 col_len = cstrlen(field->name);
727 if (type == VERT_LIST) {
728 if (col_len > max_len) {
732 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
733 field->max_length += (field->max_length - 1) / 3;
735 if (col_len < (int)field->max_length) {
736 col_len = field->max_length;
738 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
739 col_len = 4; /* 4 = length of the word "NULL" */
741 field->max_length = col_len; /* reset column info */
745 Dmsg0(800, "list_result finished first loop\n");
746 if (type == VERT_LIST) {
750 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
751 list_dashes(mdb, send, ctx);
753 sql_field_seek(mdb, 0);
754 for (i = 0; i < sql_num_fields(mdb); i++) {
755 Dmsg1(800, "list_result looking at field %d\n", i);
756 field = sql_fetch_field(mdb);
760 max_len = max_length(field->max_length);
761 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
765 list_dashes(mdb, send, ctx);
767 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
768 while ((row = sql_fetch_row(mdb)) != NULL) {
769 sql_field_seek(mdb, 0);
771 for (i = 0; i < sql_num_fields(mdb); i++) {
772 field = sql_fetch_field(mdb);
776 max_len = max_length(field->max_length);
777 if (row[i] == NULL) {
778 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
779 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
780 bsnprintf(buf, sizeof(buf), " %*s |", max_len,
781 add_commas(row[i], ewc));
783 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
789 list_dashes(mdb, send, ctx);
794 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
795 while ((row = sql_fetch_row(mdb)) != NULL) {
796 sql_field_seek(mdb, 0);
797 for (i = 0; i < sql_num_fields(mdb); i++) {
798 field = sql_fetch_field(mdb);
802 if (row[i] == NULL) {
803 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
804 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
805 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
806 add_commas(row[i], ewc));
808 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
818 * Open a new connexion to mdb catalog. This function is used
819 * by batch and accurate mode.
821 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
823 #ifdef HAVE_BATCH_FILE_INSERT
824 const int multi_db = true; /* we force a new connection only if batch insert is enabled */
826 const int multi_db = false;
829 if (!jcr->db_batch) {
830 jcr->db_batch = db_init_database(jcr,
837 multi_db /* multi_db = true when using batch mode */);
838 if (!jcr->db_batch) {
839 Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
840 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
844 if (!db_open_database(jcr, jcr->db_batch)) {
845 Mmsg2(&mdb->errmsg, _("Could not open database \"%s\": ERR=%s\n"),
846 jcr->db_batch->db_name, db_strerror(jcr->db_batch));
847 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
850 Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
851 jcr->db_batch->connected, jcr->db_batch->db);
858 * !!! WARNING !!! Use this function only when bacula is stopped.
859 * ie, after a fatal signal and before exiting the program
860 * Print information about a B_DB object.
862 void db_debug_print(JCR *jcr, FILE *fp)
870 fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
871 mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
872 fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
873 if (mdb->lock.valid == RWLOCK_VALID) {
874 fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
878 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/