2 * Bacula Catalog Database interface routines
4 * Almost generic set of SQL database interface routines
5 * (with a little more work)
7 * Kern Sibbald, March 2000
13 Copyright (C) 2000-2006 Kern Sibbald
15 This program is free software; you can redistribute it and/or
16 modify it under the terms of the GNU General Public License
17 version 2 as amended with additional clauses defined in the
18 file LICENSE in the main source directory.
20 This program is distributed in the hope that it will be useful,
21 but WITHOUT ANY WARRANTY; without even the implied warranty of
22 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
23 the file LICENSE for additional details.
27 /* The following is necessary so that we do not include
28 * the dummy external definition of B_DB.
30 #define __SQL_C /* indicate that this is sql.c */
35 #if HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL
37 uint32_t bacula_db_version = 0;
39 /* Forward referenced subroutines */
40 void print_dashes(B_DB *mdb);
41 void print_result(B_DB *mdb);
44 * Called here to retrieve an integer from the database
46 static int int_handler(void *ctx, int num_fields, char **row)
48 uint32_t *val = (uint32_t *)ctx;
50 Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
53 Dmsg1(800, "int_handler finds '%s'\n", row[0]);
54 *val = str_to_int64(row[0]);
56 Dmsg0(800, "int_handler finds zero\n");
59 Dmsg0(800, "int_handler finishes\n");
65 /* NOTE!!! The following routines expect that the
66 * calling subroutine sets and clears the mutex
69 /* Check that the tables correspond to the version we want */
70 int check_tables_version(JCR *jcr, B_DB *mdb)
72 const char *query = "SELECT VersionId FROM Version";
74 bacula_db_version = 0;
75 if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
76 Mmsg(mdb->errmsg, "Database not created or server not running.\n");
77 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
79 if (bacula_db_version != BDB_VERSION) {
80 Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
81 mdb->db_name, BDB_VERSION, bacula_db_version);
82 Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
88 /* Utility routine for queries. The database MUST be locked before calling here. */
90 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
93 if ((status=sql_query(mdb, cmd)) != 0) {
94 m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
95 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
97 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
102 mdb->result = sql_store_result(mdb);
104 return mdb->result != NULL;
108 * Utility routine to do inserts
109 * Returns: 0 on failure
113 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
115 if (sql_query(mdb, cmd)) {
116 m_msg(file, line, &mdb->errmsg, _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
117 j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
119 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
123 if (mdb->have_insert_id) {
124 mdb->num_rows = sql_affected_rows(mdb);
128 if (mdb->num_rows != 1) {
130 m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
131 edit_uint64(mdb->num_rows, ed1));
133 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
141 /* Utility routine for updates.
142 * Returns: 0 on failure
146 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
149 if (sql_query(mdb, cmd)) {
150 m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
151 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
153 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
157 mdb->num_rows = sql_affected_rows(mdb);
158 if (mdb->num_rows < 1) {
160 m_msg(file, line, &mdb->errmsg, _("Update problem: affected_rows=%s\n"),
161 edit_uint64(mdb->num_rows, ed1));
163 // j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
171 /* Utility routine for deletes
173 * Returns: -1 on error
174 * n number of rows affected
177 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
180 if (sql_query(mdb, cmd)) {
181 m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
182 j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
184 j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
189 return sql_affected_rows(mdb);
194 * Get record max. Query is already in mdb->cmd
197 * Returns: -1 on failure
200 int get_sql_record_max(JCR *jcr, B_DB *mdb)
205 if (QUERY_DB(jcr, mdb, mdb->cmd)) {
206 if ((row = sql_fetch_row(mdb)) == NULL) {
207 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
210 stat = str_to_int64(row[0]);
212 sql_free_result(mdb);
214 Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
221 * Return pre-edited error message
223 char *db_strerror(B_DB *mdb)
229 * Lock database, this can be called multiple times by the same
230 * thread without blocking, but must be unlocked the number of
231 * times it was locked.
233 void _db_lock(const char *file, int line, B_DB *mdb)
236 if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
238 e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
239 errstat, be.strerror(errstat));
244 * Unlock the database. This can be called multiple times by the
245 * same thread up to the number of times that thread called
248 void _db_unlock(const char *file, int line, B_DB *mdb)
251 if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
253 e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
254 errstat, be.strerror(errstat));
259 * Start a transaction. This groups inserts and makes things
260 * much more efficient. Usually started when inserting
263 void db_start_transaction(JCR *jcr, B_DB *mdb)
266 jcr->attr = get_pool_memory(PM_FNAME);
269 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
273 if (!mdb->allow_transactions) {
277 /* Allow only 10,000 changes per transaction */
278 if (mdb->transaction && mdb->changes > 10000) {
279 db_end_transaction(jcr, mdb);
281 if (!mdb->transaction) {
282 my_sqlite_query(mdb, "BEGIN"); /* begin transaction */
283 Dmsg0(400, "Start SQLite transaction\n");
284 mdb->transaction = 1;
290 * This is turned off because transactions break
291 * if multiple simultaneous jobs are run.
293 #ifdef HAVE_POSTGRESQL
294 if (!mdb->allow_transactions) {
298 /* Allow only 25,000 changes per transaction */
299 if (mdb->transaction && mdb->changes > 25000) {
300 db_end_transaction(jcr, mdb);
302 if (!mdb->transaction) {
303 db_sql_query(mdb, "BEGIN", NULL, NULL); /* begin transaction */
304 Dmsg0(400, "Start PosgreSQL transaction\n");
305 mdb->transaction = 1;
311 void db_end_transaction(JCR *jcr, B_DB *mdb)
314 * This can be called during thread cleanup and
315 * the db may already be closed. So simply return.
321 if (jcr && jcr->cached_attribute) {
322 Dmsg0(400, "Flush last cached attribute.\n");
323 if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
324 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
326 jcr->cached_attribute = false;
330 if (!mdb->allow_transactions) {
334 if (mdb->transaction) {
335 my_sqlite_query(mdb, "COMMIT"); /* end transaction */
336 mdb->transaction = 0;
337 Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
343 #ifdef HAVE_POSTGRESQL
344 if (!mdb->allow_transactions) {
348 if (mdb->transaction) {
349 db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
350 mdb->transaction = 0;
351 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
359 * Given a full filename, split it into its path
360 * and filename parts. They are returned in pool memory
361 * in the mdb structure.
363 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
367 /* Find path without the filename.
368 * I.e. everything after the last / is a "filename".
369 * OK, maybe it is a directory name, but we treat it like
370 * a filename. If we don't find a / then the whole name
371 * must be a path name (e.g. c:).
373 for (p=f=fname; *p; p++) {
375 f = p; /* set pos of last slash */
378 if (*f == '/') { /* did we find a slash? */
379 f++; /* yes, point to filename */
380 } else { /* no, whole thing must be path name */
384 /* If filename doesn't exist (i.e. root directory), we
385 * simply create a blank name consisting of a single
386 * space. This makes handling zero length filenames
391 mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
392 memcpy(mdb->fname, f, mdb->fnl); /* copy filename */
393 mdb->fname[mdb->fnl] = 0;
399 mdb->pnl = f - fname;
401 mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
402 memcpy(mdb->path, fname, mdb->pnl);
403 mdb->path[mdb->pnl] = 0;
405 Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
406 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
411 Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
415 * List dashes as part of header for listing SQL results in a table
418 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
423 sql_field_seek(mdb, 0);
425 for (i = 0; i < sql_num_fields(mdb); i++) {
426 field = sql_fetch_field(mdb);
427 for (j = 0; j < (int)field->max_length + 2; j++) {
436 * If full_list is set, we list vertically, otherwise, we
437 * list on one line horizontally.
440 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
444 int i, col_len, max_len = 0;
445 char buf[2000], ewc[30];
447 Dmsg0(800, "list_result starts\n");
448 if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
449 send(ctx, _("No results to list.\n"));
453 Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
454 /* determine column display widths */
455 sql_field_seek(mdb, 0);
456 for (i = 0; i < sql_num_fields(mdb); i++) {
457 Dmsg1(800, "list_result processing field %d\n", i);
458 field = sql_fetch_field(mdb);
459 col_len = cstrlen(field->name);
460 if (type == VERT_LIST) {
461 if (col_len > max_len) {
465 if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
466 field->max_length += (field->max_length - 1) / 3;
468 if (col_len < (int)field->max_length) {
469 col_len = field->max_length;
471 if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
472 col_len = 4; /* 4 = length of the word "NULL" */
474 field->max_length = col_len; /* reset column info */
478 Dmsg0(800, "list_result finished first loop\n");
479 if (type == VERT_LIST) {
483 Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
484 list_dashes(mdb, send, ctx);
486 sql_field_seek(mdb, 0);
487 for (i = 0; i < sql_num_fields(mdb); i++) {
488 Dmsg1(800, "list_result looking at field %d\n", i);
489 field = sql_fetch_field(mdb);
490 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
494 list_dashes(mdb, send, ctx);
496 Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
497 while ((row = sql_fetch_row(mdb)) != NULL) {
498 sql_field_seek(mdb, 0);
500 for (i = 0; i < sql_num_fields(mdb); i++) {
501 field = sql_fetch_field(mdb);
502 if (row[i] == NULL) {
503 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
504 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
505 bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
506 add_commas(row[i], ewc));
508 bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
514 list_dashes(mdb, send, ctx);
519 Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
520 while ((row = sql_fetch_row(mdb)) != NULL) {
521 sql_field_seek(mdb, 0);
522 for (i = 0; i < sql_num_fields(mdb); i++) {
523 field = sql_fetch_field(mdb);
524 if (row[i] == NULL) {
525 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
526 } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
527 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
528 add_commas(row[i], ewc));
530 bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
540 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/