2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database interface routines
31 * Almost generic set of SQL database interface routines
32 * (with a little more work)
33 * SQL engine specific routines are in mysql.c, postgresql.c,
36 * Kern Sibbald, March 2000
38 * Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
41 /* The following is necessary so that we do not include
42 * the dummy external definition of B_DB.
44 #define __SQL_C /* indicate that this is sql.c */
49 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
51 uint32_t bacula_db_version = 0;
53 int db_type = -1; /* SQL engine type index */
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60 const char *db_password, const char *db_address, int db_port,
61 const char *db_socket, int mult_db_connections)
66 Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
68 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69 Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
71 p = (char *)(db_driver + 4);
72 if (strcasecmp(p, "mysql") == 0) {
73 db_type = SQL_TYPE_MYSQL;
74 } else if (strcasecmp(p, "postgresql") == 0) {
75 db_type = SQL_TYPE_POSTGRESQL;
76 } else if (strcasecmp(p, "sqlite") == 0) {
77 db_type = SQL_TYPE_SQLITE;
78 } else if (strcasecmp(p, "sqlite3") == 0) {
79 db_type = SQL_TYPE_SQLITE3;
81 Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
84 db_type = SQL_TYPE_MYSQL;
86 db_type = SQL_TYPE_POSTGRESQL;
88 db_type = SQL_TYPE_INGRES;
90 db_type = SQL_TYPE_SQLITE;
92 db_type = SQL_TYPE_SQLITE3;
95 return db_init_database(jcr, db_name, db_user, db_password, db_address,
96 db_port, db_socket, mult_db_connections);
99 dbid_list::dbid_list()
101 memset(this, 0, sizeof(dbid_list));
103 DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
104 num_ids = num_seen = tot_ids = 0;
108 dbid_list::~dbid_list()
114 * Called here to retrieve an integer from the database
116 int db_int_handler(void *ctx, int num_fields, char **row)
118 uint32_t *val = (uint32_t *)ctx;
120 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
123 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
124 *val = str_to_int64(row[0]);
126 Dmsg0(800, "int_handler finds zero\n");
129 Dmsg0(800, "int_handler finishes\n");
134 * Called here to retrieve a 32/64 bit integer from the database.
135 * The returned integer will be extended to 64 bit.
137 int db_int64_handler(void *ctx, int num_fields, char **row)
139 db_int64_ctx *lctx = (db_int64_ctx *)ctx;
142 lctx->value = str_to_int64(row[0]);
149 * Use to build a comma separated list of values from a query. "10,20,30"
151 int db_list_handler(void *ctx, int num_fields, char **row)
153 db_list_ctx *lctx = (db_list_ctx *)ctx;
154 if (num_fields == 1 && row[0]) {
156 pm_strcat(lctx->list, ",");
158 pm_strcat(lctx->list, row[0]);
166 * Called here to retrieve an integer from the database
168 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
170 uint32_t *val = (uint32_t *)ctx;
171 uint32_t index = sql_get_max_connections_index[db_type];
173 *val = str_to_int64(row[index]);
175 Dmsg0(800, "int_handler finds zero\n");
182 * Check catalog max_connections setting
184 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
189 /* Without Batch insert, no need to verify max_connections */
190 #ifndef HAVE_BATCH_FILE_INSERT
194 /* Check max_connections setting */
195 if (!db_sql_query(mdb, sql_get_max_connections[db_type],
196 db_max_connections_handler, &max_conn)) {
197 Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
200 if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
202 _("On db_name=%s, %s max_connections=%d is lower than Director "
203 "MaxConcurentJobs=%d\n"),
204 mdb->db_name, db_get_type(), max_conn, max_concurrent_jobs);
205 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
212 /* NOTE!!! The following routines expect that the
213 * calling subroutine sets and clears the mutex
216 /* Check that the tables correspond to the version we want */
217 bool check_tables_version(JCR *jcr, B_DB *mdb)
219 const char *query = "SELECT VersionId FROM Version";
221 bacula_db_version = 0;
222 if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
223 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
226 if (bacula_db_version != BDB_VERSION) {
227 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
228 mdb->db_name, BDB_VERSION, bacula_db_version);
229 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
235 /* Utility routine for queries. The database MUST be locked before calling here. */
237 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
241 sql_free_result(mdb);
242 if ((status=sql_query(mdb, cmd)) != 0) {
243 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
244 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
246 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
251 mdb->result = sql_store_result(mdb);
253 return mdb->result != NULL;
257 * Utility routine to do inserts
258 * Returns: 0 on failure
262 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
264 if (sql_query(mdb, cmd)) {
265 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
266 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
268 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
272 if (mdb->have_insert_id) {
273 mdb->num_rows = sql_affected_rows(mdb);
277 if (mdb->num_rows != 1) {
279 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
280 edit_uint64(mdb->num_rows, ed1));
282 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
290 /* Utility routine for updates.
291 * Returns: 0 on failure
295 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
298 if (sql_query(mdb, cmd)) {
299 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
300 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
302 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
306 mdb->num_rows = sql_affected_rows(mdb);
307 if (mdb->num_rows < 1) {
309 m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
310 edit_uint64(mdb->num_rows, ed1), cmd);
312 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
320 /* Utility routine for deletes
322 * Returns: -1 on error
323 * n number of rows affected
326 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
329 if (sql_query(mdb, cmd)) {
330 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
331 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
333 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
338 return sql_affected_rows(mdb);
343 * Get record max. Query is already in mdb->cmd
346 * Returns: -1 on failure
349 int get_sql_record_max(JCR *jcr, B_DB *mdb)
354 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
355 if ((row = sql_fetch_row(mdb)) == NULL) {
356 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
359 stat = str_to_int64(row[0]);
361 sql_free_result(mdb);
363 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
370 * Return pre-edited error message
372 char *db_strerror(B_DB *mdb)
378 * Lock database, this can be called multiple times by the same
379 * thread without blocking, but must be unlocked the number of
380 * times it was locked.
382 void _db_lock(const char *file, int line, B_DB *mdb)
385 if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
387 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
388 errstat, be.bstrerror(errstat));
393 * Unlock the database. This can be called multiple times by the
394 * same thread up to the number of times that thread called
397 void _db_unlock(const char *file, int line, B_DB *mdb)
400 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
402 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
403 errstat, be.bstrerror(errstat));
408 * Start a transaction. This groups inserts and makes things
409 * much more efficient. Usually started when inserting
412 void db_start_transaction(JCR *jcr, B_DB *mdb)
415 jcr->attr = get_pool_memory(PM_FNAME);
418 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
422 if (!mdb->allow_transactions) {
426 /* Allow only 10,000 changes per transaction */
427 if (mdb->transaction && mdb->changes > 10000) {
428 db_end_transaction(jcr, mdb);
430 if (!mdb->transaction) {
431 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
432 Dmsg0(400, "Start SQLite transaction\n");
433 mdb->transaction = 1;
439 * This is turned off because transactions break
440 * if multiple simultaneous jobs are run.
442 #ifdef HAVE_POSTGRESQL
443 if (!mdb->allow_transactions) {
447 /* Allow only 25,000 changes per transaction */
448 if (mdb->transaction && mdb->changes > 25000) {
449 db_end_transaction(jcr, mdb);
451 if (!mdb->transaction) {
452 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
453 Dmsg0(400, "Start PosgreSQL transaction\n");
454 mdb->transaction = 1;
460 if (!mdb->allow_transactions) {
464 /* Allow only 25,000 changes per transaction */
465 if (mdb->transaction && mdb->changes > 25000) {
466 db_end_transaction(jcr, mdb);
468 if (!mdb->transaction) {
469 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
470 Dmsg0(400, "Start Ingres transaction\n");
471 mdb->transaction = 1;
477 if (db_type == SQL_TYPE_SQLITE) {
478 if (!mdb->allow_transactions) {
482 /* Allow only 10,000 changes per transaction */
483 if (mdb->transaction && mdb->changes > 10000) {
484 db_end_transaction(jcr, mdb);
486 if (!mdb->transaction) {
487 //my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
488 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
489 Dmsg0(400, "Start SQLite transaction\n");
490 mdb->transaction = 1;
493 } else if (db_type == SQL_TYPE_POSTGRESQL) {
494 if (!mdb->allow_transactions) {
498 /* Allow only 25,000 changes per transaction */
499 if (mdb->transaction && mdb->changes > 25000) {
500 db_end_transaction(jcr, mdb);
502 if (!mdb->transaction) {
503 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
504 Dmsg0(400, "Start PosgreSQL transaction\n");
505 mdb->transaction = 1;
512 void db_end_transaction(JCR *jcr, B_DB *mdb)
515 * This can be called during thread cleanup and
516 * the db may already be closed. So simply return.
522 if (jcr && jcr->cached_attribute) {
523 Dmsg0(400, "Flush last cached attribute.\n");
524 if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
525 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
527 jcr->cached_attribute = false;
531 if (!mdb->allow_transactions) {
535 if (mdb->transaction) {
536 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
537 mdb->transaction = 0;
538 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
547 if (!mdb->allow_transactions) {
551 if (mdb->transaction) {
552 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
553 mdb->transaction = 0;
554 Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
561 #ifdef HAVE_POSTGRESQL
562 if (!mdb->allow_transactions) {
566 if (mdb->transaction) {
567 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
568 mdb->transaction = 0;
569 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
576 if (db_type == SQL_TYPE_SQLITE) {
577 if (!mdb->allow_transactions) {
581 if (mdb->transaction) {
582 //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
583 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
584 mdb->transaction = 0;
585 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
589 } else if (db_type == SQL_TYPE_POSTGRESQL) {
590 if (!mdb->allow_transactions) {
594 if (mdb->transaction) {
595 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
596 mdb->transaction = 0;
597 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
606 * Given a full filename, split it into its path
607 * and filename parts. They are returned in pool memory
608 * in the mdb structure.
610 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
614 /* Find path without the filename.
615 * I.e. everything after the last / is a "filename".
616 * OK, maybe it is a directory name, but we treat it like
617 * a filename. If we don't find a / then the whole name
618 * must be a path name (e.g. c:).
620 for (p=f=fname; *p; p++) {
621 if (IsPathSeparator(*p)) {
622 f = p; /* set pos of last slash */
625 if (IsPathSeparator(*f)) { /* did we find a slash? */
626 f++; /* yes, point to filename */
627 } else { /* no, whole thing must be path name */
631 /* If filename doesn't exist (i.e. root directory), we
632 * simply create a blank name consisting of a single
633 * space. This makes handling zero length filenames
638 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
639 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
640 mdb->fname[mdb->fnl] = 0;
646 mdb->pnl = f - fname;
648 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
649 memcpy(mdb->path, fname, mdb->pnl);
650 mdb->path[mdb->pnl] = 0;
652 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
653 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
658 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
662 * Set maximum field length to something reasonable
664 static int max_length(int max_length)
666 int max_len = max_length;
670 } else if (max_len > 100) {
677 * List dashes as part of header for listing SQL results in a table
680 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
686 sql_field_seek(mdb, 0);
688 for (i = 0; i < sql_num_fields(mdb); i++) {
689 field = sql_fetch_field(mdb);
693 len = max_length(field->max_length + 2);
694 for (j = 0; j < len; j++) {
703 * If full_list is set, we list vertically, otherwise, we
704 * list on one line horizontally.
707 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
711 int i, col_len, max_len = 0;
712 char buf[2000], ewc[30];
714 Dmsg0(800, "list_result starts\n");
715 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
716 send(ctx, _("No results to list.\n"));
720 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
721 /* determine column display widths */
722 sql_field_seek(mdb, 0);
723 for (i = 0; i < sql_num_fields(mdb); i++) {
724 Dmsg1(800, "list_result processing field %d\n", i);
725 field = sql_fetch_field(mdb);
729 col_len = cstrlen(field->name);
730 if (type == VERT_LIST) {
731 if (col_len > max_len) {
735 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
736 field->max_length += (field->max_length - 1) / 3;
738 if (col_len < (int)field->max_length) {
739 col_len = field->max_length;
741 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
742 col_len = 4; /* 4 = length of the word "NULL" */
744 field->max_length = col_len; /* reset column info */
748 Dmsg0(800, "list_result finished first loop\n");
749 if (type == VERT_LIST) {
753 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
754 list_dashes(mdb, send, ctx);
756 sql_field_seek(mdb, 0);
757 for (i = 0; i < sql_num_fields(mdb); i++) {
758 Dmsg1(800, "list_result looking at field %d\n", i);
759 field = sql_fetch_field(mdb);
763 max_len = max_length(field->max_length);
764 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
768 list_dashes(mdb, send, ctx);
770 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
771 while ((row = sql_fetch_row(mdb)) != NULL) {
772 sql_field_seek(mdb, 0);
774 for (i = 0; i < sql_num_fields(mdb); i++) {
775 field = sql_fetch_field(mdb);
779 max_len = max_length(field->max_length);
780 if (row[i] == NULL) {
781 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
782 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
783 bsnprintf(buf, sizeof(buf), " %*s |", max_len,
784 add_commas(row[i], ewc));
786 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
792 list_dashes(mdb, send, ctx);
797 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
798 while ((row = sql_fetch_row(mdb)) != NULL) {
799 sql_field_seek(mdb, 0);
800 for (i = 0; i < sql_num_fields(mdb); i++) {
801 field = sql_fetch_field(mdb);
805 if (row[i] == NULL) {
806 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
807 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
808 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
809 add_commas(row[i], ewc));
811 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
821 * Open a new connexion to mdb catalog. This function is used
822 * by batch and accurate mode.
824 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
826 #ifdef HAVE_BATCH_FILE_INSERT
827 const int multi_db = true; /* we force a new connection only if batch insert is enabled */
829 const int multi_db = false;
832 if (!jcr->db_batch) {
833 jcr->db_batch = db_init_database(jcr,
840 multi_db /* multi_db = true when using batch mode */);
841 if (!jcr->db_batch) {
842 Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
843 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
847 if (!db_open_database(jcr, jcr->db_batch)) {
848 Mmsg2(&mdb->errmsg, _("Could not open database \"%s\": ERR=%s\n"),
849 jcr->db_batch->db_name, db_strerror(jcr->db_batch));
850 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
853 Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
854 jcr->db_batch->connected, jcr->db_batch->db);
861 * !!! WARNING !!! Use this function only when bacula is stopped.
862 * ie, after a fatal signal and before exiting the program
863 * Print information about a B_DB object.
865 void db_debug_print(JCR *jcr, FILE *fp)
873 fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
874 mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
875 fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
876 if (mdb->lock.valid == RWLOCK_VALID) {
877 fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
881 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/