2 * Bacula Catalog Database interface routines
4 * Almost generic set of SQL database interface routines
5 * (with a little more work)
7 * Kern Sibbald, March 2000
13 Copyright (C) 2000-2006 Kern Sibbald
15 This program is free software; you can redistribute it and/or
16 modify it under the terms of the GNU General Public License
17 version 2 as amended with additional clauses defined in the
18 file LICENSE in the main source directory.
20 This program is distributed in the hope that it will be useful,
21 but WITHOUT ANY WARRANTY; without even the implied warranty of
22 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 the file LICENSE for additional details.
27 /* The following is necessary so that we do not include
28 * the dummy external definition of B_DB.
30 #define __SQL_C /* indicate that this is sql.c */
35 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL
37 uint32_t bacula_db_version = 0;
39 /* Forward referenced subroutines */
40 void print_dashes(B_DB *mdb);
41 void print_result(B_DB *mdb);
44 * Called here to retrieve an integer from the database
46 static int int_handler(void *ctx, int num_fields, char **row)
48 uint32_t *val = (uint32_t *)ctx;
50 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
53 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
54 *val = str_to_int64(row[0]);
56 Dmsg0(800, "int_handler finds zero\n");
59 Dmsg0(800, "int_handler finishes\n");
64 * Called here to retrieve a 32/64 bit integer from the database.
65 * The returned integer will be extended to 64 bit.
67 int db_int64_handler(void *ctx, int num_fields, char **row)
69 db_int64_ctx *lctx = (db_int64_ctx *)ctx;
72 lctx->value = str_to_int64(row[0]);
80 /* NOTE!!! The following routines expect that the
81 * calling subroutine sets and clears the mutex
84 /* Check that the tables correspond to the version we want */
85 bool check_tables_version(JCR *jcr, B_DB *mdb)
87 const char *query = "SELECT VersionId FROM Version";
89 bacula_db_version = 0;
90 if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
91 Mmsg(mdb->errmsg, "Database not created or server not running.\n");
92 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
95 if (bacula_db_version != BDB_VERSION) {
96 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
97 mdb->db_name, BDB_VERSION, bacula_db_version);
98 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
104 /* Utility routine for queries. The database MUST be locked before calling here. */
106 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
109 if ((status=sql_query(mdb, cmd)) != 0) {
110 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
111 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
113 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
118 mdb->result = sql_store_result(mdb);
120 return mdb->result != NULL;
124 * Utility routine to do inserts
125 * Returns: 0 on failure
129 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
131 if (sql_query(mdb, cmd)) {
132 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
133 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
135 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
139 if (mdb->have_insert_id) {
140 mdb->num_rows = sql_affected_rows(mdb);
144 if (mdb->num_rows != 1) {
146 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
147 edit_uint64(mdb->num_rows, ed1));
149 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
157 /* Utility routine for updates.
158 * Returns: 0 on failure
162 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
165 if (sql_query(mdb, cmd)) {
166 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
167 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
169 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
173 mdb->num_rows = sql_affected_rows(mdb);
174 if (mdb->num_rows < 1) {
176 m_msg(file, line, &mdb->errmsg, _("Update problem: affected_rows=%s\n"),
177 edit_uint64(mdb->num_rows, ed1));
179 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
187 /* Utility routine for deletes
189 * Returns: -1 on error
190 * n number of rows affected
193 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
196 if (sql_query(mdb, cmd)) {
197 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
198 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
200 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
205 return sql_affected_rows(mdb);
210 * Get record max. Query is already in mdb->cmd
213 * Returns: -1 on failure
216 int get_sql_record_max(JCR *jcr, B_DB *mdb)
221 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
222 if ((row = sql_fetch_row(mdb)) == NULL) {
223 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
226 stat = str_to_int64(row[0]);
228 sql_free_result(mdb);
230 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
237 * Return pre-edited error message
239 char *db_strerror(B_DB *mdb)
245 * Lock database, this can be called multiple times by the same
246 * thread without blocking, but must be unlocked the number of
247 * times it was locked.
249 void _db_lock(const char *file, int line, B_DB *mdb)
252 if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
254 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
255 errstat, be.strerror(errstat));
260 * Unlock the database. This can be called multiple times by the
261 * same thread up to the number of times that thread called
264 void _db_unlock(const char *file, int line, B_DB *mdb)
267 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
269 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
270 errstat, be.strerror(errstat));
275 * Start a transaction. This groups inserts and makes things
276 * much more efficient. Usually started when inserting
279 void db_start_transaction(JCR *jcr, B_DB *mdb)
282 jcr->attr = get_pool_memory(PM_FNAME);
285 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
289 if (!mdb->allow_transactions) {
293 /* Allow only 10,000 changes per transaction */
294 if (mdb->transaction && mdb->changes > 10000) {
295 db_end_transaction(jcr, mdb);
297 if (!mdb->transaction) {
298 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
299 Dmsg0(400, "Start SQLite transaction\n");
300 mdb->transaction = 1;
306 * This is turned off because transactions break
307 * if multiple simultaneous jobs are run.
309 #ifdef HAVE_POSTGRESQL
310 if (!mdb->allow_transactions) {
314 /* Allow only 25,000 changes per transaction */
315 if (mdb->transaction && mdb->changes > 25000) {
316 db_end_transaction(jcr, mdb);
318 if (!mdb->transaction) {
319 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
320 Dmsg0(400, "Start PosgreSQL transaction\n");
321 mdb->transaction = 1;
327 void db_end_transaction(JCR *jcr, B_DB *mdb)
330 * This can be called during thread cleanup and
331 * the db may already be closed. So simply return.
337 if (jcr && jcr->cached_attribute) {
338 Dmsg0(400, "Flush last cached attribute.\n");
339 if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
340 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
342 jcr->cached_attribute = false;
346 if (!mdb->allow_transactions) {
350 if (mdb->transaction) {
351 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
352 mdb->transaction = 0;
353 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
359 #ifdef HAVE_POSTGRESQL
360 if (!mdb->allow_transactions) {
364 if (mdb->transaction) {
365 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
366 mdb->transaction = 0;
367 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
375 * Given a full filename, split it into its path
376 * and filename parts. They are returned in pool memory
377 * in the mdb structure.
379 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
383 /* Find path without the filename.
384 * I.e. everything after the last / is a "filename".
385 * OK, maybe it is a directory name, but we treat it like
386 * a filename. If we don't find a / then the whole name
387 * must be a path name (e.g. c:).
389 for (p=f=fname; *p; p++) {
391 f = p; /* set pos of last slash */
394 if (*f == '/') { /* did we find a slash? */
395 f++; /* yes, point to filename */
396 } else { /* no, whole thing must be path name */
400 /* If filename doesn't exist (i.e. root directory), we
401 * simply create a blank name consisting of a single
402 * space. This makes handling zero length filenames
407 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
408 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
409 mdb->fname[mdb->fnl] = 0;
415 mdb->pnl = f - fname;
417 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
418 memcpy(mdb->path, fname, mdb->pnl);
419 mdb->path[mdb->pnl] = 0;
421 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
422 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
427 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
431 * List dashes as part of header for listing SQL results in a table
434 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
439 sql_field_seek(mdb, 0);
441 for (i = 0; i < sql_num_fields(mdb); i++) {
442 field = sql_fetch_field(mdb);
443 for (j = 0; j < (int)field->max_length + 2; j++) {
452 * If full_list is set, we list vertically, otherwise, we
453 * list on one line horizontally.
456 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
460 int i, col_len, max_len = 0;
461 char buf[2000], ewc[30];
463 Dmsg0(800, "list_result starts\n");
464 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
465 send(ctx, _("No results to list.\n"));
469 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
470 /* determine column display widths */
471 sql_field_seek(mdb, 0);
472 for (i = 0; i < sql_num_fields(mdb); i++) {
473 Dmsg1(800, "list_result processing field %d\n", i);
474 field = sql_fetch_field(mdb);
475 col_len = cstrlen(field->name);
476 if (type == VERT_LIST) {
477 if (col_len > max_len) {
481 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
482 field->max_length += (field->max_length - 1) / 3;
484 if (col_len < (int)field->max_length) {
485 col_len = field->max_length;
487 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
488 col_len = 4; /* 4 = length of the word "NULL" */
490 field->max_length = col_len; /* reset column info */
494 Dmsg0(800, "list_result finished first loop\n");
495 if (type == VERT_LIST) {
499 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
500 list_dashes(mdb, send, ctx);
502 sql_field_seek(mdb, 0);
503 for (i = 0; i < sql_num_fields(mdb); i++) {
504 Dmsg1(800, "list_result looking at field %d\n", i);
505 field = sql_fetch_field(mdb);
506 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
510 list_dashes(mdb, send, ctx);
512 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
513 while ((row = sql_fetch_row(mdb)) != NULL) {
514 sql_field_seek(mdb, 0);
516 for (i = 0; i < sql_num_fields(mdb); i++) {
517 field = sql_fetch_field(mdb);
518 if (row[i] == NULL) {
519 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
520 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
521 bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
522 add_commas(row[i], ewc));
524 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
530 list_dashes(mdb, send, ctx);
535 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
536 while ((row = sql_fetch_row(mdb)) != NULL) {
537 sql_field_seek(mdb, 0);
538 for (i = 0; i < sql_num_fields(mdb); i++) {
539 field = sql_fetch_field(mdb);
540 if (row[i] == NULL) {
541 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
542 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
543 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
544 add_commas(row[i], ewc));
546 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
556 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/