2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database interface routines
31 * Almost generic set of SQL database interface routines
32 * (with a little more work)
33 * SQL engine specific routines are in mysql.c, postgresql.c,
36 * Kern Sibbald, March 2000
38 * Version $Id: sql.c 8034 2008-11-11 14:33:46Z ricozz $
41 /* The following is necessary so that we do not include
42 * the dummy external definition of B_DB.
44 #define __SQL_C /* indicate that this is sql.c */
49 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES || HAVE_DBI
51 uint32_t bacula_db_version = 0;
53 int db_type = -1; /* SQL engine type index */
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
60 const char *db_password, const char *db_address, int db_port,
61 const char *db_socket, int mult_db_connections)
66 Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
68 if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69 Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
71 p = (char *)(db_driver + 4);
72 if (strcasecmp(p, "mysql") == 0) {
73 db_type = SQL_TYPE_MYSQL;
74 } else if (strcasecmp(p, "postgresql") == 0) {
75 db_type = SQL_TYPE_POSTGRESQL;
76 } else if (strcasecmp(p, "sqlite") == 0) {
77 db_type = SQL_TYPE_SQLITE;
78 } else if (strcasecmp(p, "sqlite3") == 0) {
79 db_type = SQL_TYPE_SQLITE3;
80 } else if (strcasecmp(p, "ingres") == 0) {
81 db_type = SQL_TYPE_INGRES;
83 Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
86 db_type = SQL_TYPE_MYSQL;
88 db_type = SQL_TYPE_POSTGRESQL;
90 db_type = SQL_TYPE_INGRES;
92 db_type = SQL_TYPE_SQLITE;
94 db_type = SQL_TYPE_SQLITE3;
97 return db_init_database(jcr, db_name, db_user, db_password, db_address,
98 db_port, db_socket, mult_db_connections);
101 dbid_list::dbid_list()
103 memset(this, 0, sizeof(dbid_list));
105 DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
106 num_ids = num_seen = tot_ids = 0;
110 dbid_list::~dbid_list()
116 * Called here to retrieve an integer from the database
118 int db_int_handler(void *ctx, int num_fields, char **row)
120 uint32_t *val = (uint32_t *)ctx;
122 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
125 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
126 *val = str_to_int64(row[0]);
128 Dmsg0(800, "int_handler finds zero\n");
131 Dmsg0(800, "int_handler finishes\n");
136 * Called here to retrieve a 32/64 bit integer from the database.
137 * The returned integer will be extended to 64 bit.
139 int db_int64_handler(void *ctx, int num_fields, char **row)
141 db_int64_ctx *lctx = (db_int64_ctx *)ctx;
144 lctx->value = str_to_int64(row[0]);
151 * Use to build a comma separated list of values from a query. "10,20,30"
153 int db_list_handler(void *ctx, int num_fields, char **row)
155 db_list_ctx *lctx = (db_list_ctx *)ctx;
156 if (num_fields == 1 && row[0]) {
158 pm_strcat(lctx->list, ",");
160 pm_strcat(lctx->list, row[0]);
168 * Called here to retrieve an integer from the database
170 static int db_max_connections_handler(void *ctx, int num_fields, char **row)
172 uint32_t *val = (uint32_t *)ctx;
173 uint32_t index = sql_get_max_connections_index[db_type];
175 *val = str_to_int64(row[index]);
177 Dmsg0(800, "int_handler finds zero\n");
184 * Check catalog max_connections setting
186 bool db_check_max_connections(JCR *jcr, B_DB *mdb, uint32_t max_concurrent_jobs)
191 /* Without Batch insert, no need to verify max_connections */
192 #ifndef HAVE_BATCH_FILE_INSERT
196 /* Check max_connections setting */
197 if (!db_sql_query(mdb, sql_get_max_connections[db_type],
198 db_max_connections_handler, &max_conn)) {
199 Jmsg(jcr, M_ERROR, 0, "Can't verify max_connections settings %s", mdb->errmsg);
202 if (max_conn && max_concurrent_jobs && max_concurrent_jobs > max_conn) {
204 _("Potential performance problem:\n"
205 "max_connections=%d set for %s database \"%s\" should be larger than Director's "
206 "MaxConcurrentJobs=%d\n"),
207 max_conn, db_get_type(), mdb->db_name, max_concurrent_jobs);
208 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
215 /* NOTE!!! The following routines expect that the
216 * calling subroutine sets and clears the mutex
219 /* Check that the tables correspond to the version we want */
220 bool check_tables_version(JCR *jcr, B_DB *mdb)
222 const char *query = "SELECT VersionId FROM Version";
224 bacula_db_version = 0;
225 if (!db_sql_query(mdb, query, db_int_handler, (void *)&bacula_db_version)) {
226 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
229 if (bacula_db_version != BDB_VERSION) {
230 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
231 mdb->db_name, BDB_VERSION, bacula_db_version);
232 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
238 /* Utility routine for queries. The database MUST be locked before calling here. */
240 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
244 sql_free_result(mdb);
245 if ((status=sql_query(mdb, cmd)) != 0) {
246 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
247 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
249 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
254 mdb->result = sql_store_result(mdb);
256 return mdb->result != NULL;
260 * Utility routine to do inserts
261 * Returns: 0 on failure
265 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
267 if (sql_query(mdb, cmd)) {
268 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
269 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
271 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
275 if (mdb->have_insert_id) {
276 mdb->num_rows = sql_affected_rows(mdb);
280 if (mdb->num_rows != 1) {
282 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
283 edit_uint64(mdb->num_rows, ed1));
285 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
293 /* Utility routine for updates.
294 * Returns: 0 on failure
298 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
301 if (sql_query(mdb, cmd)) {
302 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
303 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
305 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
309 mdb->num_rows = sql_affected_rows(mdb);
310 if (mdb->num_rows < 1) {
312 m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
313 edit_uint64(mdb->num_rows, ed1), cmd);
315 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
323 /* Utility routine for deletes
325 * Returns: -1 on error
326 * n number of rows affected
329 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
332 if (sql_query(mdb, cmd)) {
333 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
334 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
336 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
341 return sql_affected_rows(mdb);
346 * Get record max. Query is already in mdb->cmd
349 * Returns: -1 on failure
352 int get_sql_record_max(JCR *jcr, B_DB *mdb)
357 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
358 if ((row = sql_fetch_row(mdb)) == NULL) {
359 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
362 stat = str_to_int64(row[0]);
364 sql_free_result(mdb);
366 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
373 * Return pre-edited error message
375 char *db_strerror(B_DB *mdb)
381 * Lock database, this can be called multiple times by the same
382 * thread without blocking, but must be unlocked the number of
383 * times it was locked.
385 void _db_lock(const char *file, int line, B_DB *mdb)
388 if ((errstat=rwl_writelock_p(&mdb->lock, file, line)) != 0) {
390 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
391 errstat, be.bstrerror(errstat));
396 * Unlock the database. This can be called multiple times by the
397 * same thread up to the number of times that thread called
400 void _db_unlock(const char *file, int line, B_DB *mdb)
403 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
405 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
406 errstat, be.bstrerror(errstat));
411 * Start a transaction. This groups inserts and makes things
412 * much more efficient. Usually started when inserting
415 void db_start_transaction(JCR *jcr, B_DB *mdb)
418 jcr->attr = get_pool_memory(PM_FNAME);
421 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
425 if (!mdb->allow_transactions) {
429 /* Allow only 10,000 changes per transaction */
430 if (mdb->transaction && mdb->changes > 10000) {
431 db_end_transaction(jcr, mdb);
433 if (!mdb->transaction) {
434 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
435 Dmsg0(400, "Start SQLite transaction\n");
436 mdb->transaction = 1;
442 * This is turned off because transactions break
443 * if multiple simultaneous jobs are run.
445 #ifdef HAVE_POSTGRESQL
446 if (!mdb->allow_transactions) {
450 /* Allow only 25,000 changes per transaction */
451 if (mdb->transaction && mdb->changes > 25000) {
452 db_end_transaction(jcr, mdb);
454 if (!mdb->transaction) {
455 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
456 Dmsg0(400, "Start PosgreSQL transaction\n");
457 mdb->transaction = 1;
463 if (!mdb->allow_transactions) {
467 /* Allow only 25,000 changes per transaction */
468 if (mdb->transaction && mdb->changes > 25000) {
469 db_end_transaction(jcr, mdb);
471 if (!mdb->transaction) {
472 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
473 Dmsg0(400, "Start Ingres transaction\n");
474 mdb->transaction = 1;
480 if (db_type == SQL_TYPE_SQLITE) {
481 if (!mdb->allow_transactions) {
485 /* Allow only 10,000 changes per transaction */
486 if (mdb->transaction && mdb->changes > 10000) {
487 db_end_transaction(jcr, mdb);
489 if (!mdb->transaction) {
490 //my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
491 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
492 Dmsg0(400, "Start SQLite transaction\n");
493 mdb->transaction = 1;
496 } else if (db_type == SQL_TYPE_POSTGRESQL) {
497 if (!mdb->allow_transactions) {
501 /* Allow only 25,000 changes per transaction */
502 if (mdb->transaction && mdb->changes > 25000) {
503 db_end_transaction(jcr, mdb);
505 if (!mdb->transaction) {
506 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
507 Dmsg0(400, "Start PosgreSQL transaction\n");
508 mdb->transaction = 1;
515 void db_end_transaction(JCR *jcr, B_DB *mdb)
518 * This can be called during thread cleanup and
519 * the db may already be closed. So simply return.
525 if (jcr && jcr->cached_attribute) {
526 Dmsg0(400, "Flush last cached attribute.\n");
527 if (!db_create_attributes_record(jcr, mdb, jcr->ar)) {
528 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
530 jcr->cached_attribute = false;
534 if (!mdb->allow_transactions) {
538 if (mdb->transaction) {
539 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
540 mdb->transaction = 0;
541 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
550 if (!mdb->allow_transactions) {
554 if (mdb->transaction) {
555 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
556 mdb->transaction = 0;
557 Dmsg1(400, "End Ingres transaction changes=%d\n", mdb->changes);
564 #ifdef HAVE_POSTGRESQL
565 if (!mdb->allow_transactions) {
569 if (mdb->transaction) {
570 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
571 mdb->transaction = 0;
572 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
579 if (db_type == SQL_TYPE_SQLITE) {
580 if (!mdb->allow_transactions) {
584 if (mdb->transaction) {
585 //my_sqlite_query(mdb, "COMMIT"); /* end transaction */
586 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
587 mdb->transaction = 0;
588 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
592 } else if (db_type == SQL_TYPE_POSTGRESQL) {
593 if (!mdb->allow_transactions) {
597 if (mdb->transaction) {
598 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
599 mdb->transaction = 0;
600 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
609 * Given a full filename, split it into its path
610 * and filename parts. They are returned in pool memory
611 * in the mdb structure.
613 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
617 /* Find path without the filename.
618 * I.e. everything after the last / is a "filename".
619 * OK, maybe it is a directory name, but we treat it like
620 * a filename. If we don't find a / then the whole name
621 * must be a path name (e.g. c:).
623 for (p=f=fname; *p; p++) {
624 if (IsPathSeparator(*p)) {
625 f = p; /* set pos of last slash */
628 if (IsPathSeparator(*f)) { /* did we find a slash? */
629 f++; /* yes, point to filename */
630 } else { /* no, whole thing must be path name */
634 /* If filename doesn't exist (i.e. root directory), we
635 * simply create a blank name consisting of a single
636 * space. This makes handling zero length filenames
641 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
642 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
643 mdb->fname[mdb->fnl] = 0;
649 mdb->pnl = f - fname;
651 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
652 memcpy(mdb->path, fname, mdb->pnl);
653 mdb->path[mdb->pnl] = 0;
655 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
656 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
661 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
665 * Set maximum field length to something reasonable
667 static int max_length(int max_length)
669 int max_len = max_length;
673 } else if (max_len > 100) {
680 * List dashes as part of header for listing SQL results in a table
683 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
689 sql_field_seek(mdb, 0);
691 for (i = 0; i < sql_num_fields(mdb); i++) {
692 field = sql_fetch_field(mdb);
696 len = max_length(field->max_length + 2);
697 for (j = 0; j < len; j++) {
706 * If full_list is set, we list vertically, otherwise, we
707 * list on one line horizontally.
710 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
714 int i, col_len, max_len = 0;
715 char buf[2000], ewc[30];
717 Dmsg0(800, "list_result starts\n");
718 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
719 send(ctx, _("No results to list.\n"));
723 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
724 /* determine column display widths */
725 sql_field_seek(mdb, 0);
726 for (i = 0; i < sql_num_fields(mdb); i++) {
727 Dmsg1(800, "list_result processing field %d\n", i);
728 field = sql_fetch_field(mdb);
732 col_len = cstrlen(field->name);
733 if (type == VERT_LIST) {
734 if (col_len > max_len) {
738 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
739 field->max_length += (field->max_length - 1) / 3;
741 if (col_len < (int)field->max_length) {
742 col_len = field->max_length;
744 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
745 col_len = 4; /* 4 = length of the word "NULL" */
747 field->max_length = col_len; /* reset column info */
751 Dmsg0(800, "list_result finished first loop\n");
752 if (type == VERT_LIST) {
756 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
757 list_dashes(mdb, send, ctx);
759 sql_field_seek(mdb, 0);
760 for (i = 0; i < sql_num_fields(mdb); i++) {
761 Dmsg1(800, "list_result looking at field %d\n", i);
762 field = sql_fetch_field(mdb);
766 max_len = max_length(field->max_length);
767 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, field->name);
771 list_dashes(mdb, send, ctx);
773 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
774 while ((row = sql_fetch_row(mdb)) != NULL) {
775 sql_field_seek(mdb, 0);
777 for (i = 0; i < sql_num_fields(mdb); i++) {
778 field = sql_fetch_field(mdb);
782 max_len = max_length(field->max_length);
783 if (row[i] == NULL) {
784 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, "NULL");
785 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
786 bsnprintf(buf, sizeof(buf), " %*s |", max_len,
787 add_commas(row[i], ewc));
789 bsnprintf(buf, sizeof(buf), " %-*s |", max_len, row[i]);
795 list_dashes(mdb, send, ctx);
800 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
801 while ((row = sql_fetch_row(mdb)) != NULL) {
802 sql_field_seek(mdb, 0);
803 for (i = 0; i < sql_num_fields(mdb); i++) {
804 field = sql_fetch_field(mdb);
808 if (row[i] == NULL) {
809 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
810 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
811 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
812 add_commas(row[i], ewc));
814 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
824 * Open a new connexion to mdb catalog. This function is used
825 * by batch and accurate mode.
827 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
829 #ifdef HAVE_BATCH_FILE_INSERT
830 const int multi_db = true; /* we force a new connection only if batch insert is enabled */
832 const int multi_db = false;
835 if (!jcr->db_batch) {
836 jcr->db_batch = db_init_database(jcr,
843 multi_db /* multi_db = true when using batch mode */);
844 if (!jcr->db_batch) {
845 Mmsg0(&mdb->errmsg, _("Could not init database batch connection"));
846 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
850 if (!db_open_database(jcr, jcr->db_batch)) {
851 Mmsg2(&mdb->errmsg, _("Could not open database \"%s\": ERR=%s\n"),
852 jcr->db_batch->db_name, db_strerror(jcr->db_batch));
853 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
856 Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
857 jcr->db_batch->connected, jcr->db_batch->db);
864 * !!! WARNING !!! Use this function only when bacula is stopped.
865 * ie, after a fatal signal and before exiting the program
866 * Print information about a B_DB object.
868 void db_debug_print(JCR *jcr, FILE *fp)
876 fprintf(fp, "B_DB=%p db_name=%s db_user=%s connected=%i\n",
877 mdb, NPRTB(mdb->db_name), NPRTB(mdb->db_user), mdb->connected);
878 fprintf(fp, "\tcmd=\"%s\" changes=%i\n", NPRTB(mdb->cmd), mdb->changes);
879 if (mdb->lock.valid == RWLOCK_VALID) {
880 fprintf(fp, "\tRWLOCK=%p w_active=%i w_wait=%i\n", &mdb->lock, mdb->lock.w_active, mdb->lock.w_wait);
884 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_INGRES*/