2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
39 /* The following is necessary so that we do not include
40 * the dummy external definition of DB.
42 #define __SQL_C /* indicate that this is sql.c */
47 #ifdef HAVE_POSTGRESQL
49 #include "postgres_ext.h" /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
52 /* -----------------------------------------------------------------------
54 * PostgreSQL dependent defines and subroutines
56 * -----------------------------------------------------------------------
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
65 * Retrieve database type
75 * Initialize database data structure. In principal this should
76 * never have errors, or it is really fatal.
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80 const char *db_address, int db_port, const char *db_socket,
81 int mult_db_connections)
86 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
89 P(mutex); /* lock DB queue */
90 if (!mult_db_connections) {
91 /* Look to see if DB already open */
92 for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93 if (bstrcmp(mdb->db_name, db_name) &&
94 bstrcmp(mdb->db_address, db_address) &&
95 mdb->db_port == db_port) {
96 Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99 return mdb; /* already open */
103 Dmsg0(100, "db_open first time\n");
104 mdb = (B_DB *)malloc(sizeof(B_DB));
105 memset(mdb, 0, sizeof(B_DB));
106 mdb->db_name = bstrdup(db_name);
107 mdb->db_user = bstrdup(db_user);
109 mdb->db_password = bstrdup(db_password);
112 mdb->db_address = bstrdup(db_address);
115 mdb->db_socket = bstrdup(db_socket);
117 mdb->db_port = db_port;
118 mdb->have_insert_id = TRUE;
119 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
121 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
122 mdb->cached_path = get_pool_memory(PM_FNAME);
123 mdb->cached_path_id = 0;
125 mdb->fname = get_pool_memory(PM_FNAME);
126 mdb->path = get_pool_memory(PM_FNAME);
127 mdb->esc_name = get_pool_memory(PM_FNAME);
128 mdb->esc_path = get_pool_memory(PM_FNAME);
129 mdb->allow_transactions = mult_db_connections;
130 qinsert(&db_list, &mdb->bq); /* put db in list */
136 * Now actually open the database. This can generate errors,
137 * which are returned in the errmsg
139 * DO NOT close the database or free(mdb) here !!!!
142 db_open_database(JCR *jcr, B_DB *mdb)
148 if (!PQisthreadsafe()) {
149 Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "
150 "PostgreSQL library is not thread safe. Connot continue.\n"));
154 if (mdb->connected) {
158 mdb->connected = false;
160 if ((errstat=rwl_init(&mdb->lock)) != 0) {
162 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
163 be.bstrerror(errstat));
169 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
175 /* If connection fails, try at 5 sec intervals for 30 seconds. */
176 for (int retry=0; retry < 6; retry++) {
177 /* connect to the database */
178 mdb->db = PQsetdbLogin(
179 mdb->db_address, /* default = localhost */
180 port, /* default port */
181 NULL, /* pg options */
182 NULL, /* tty, ignored */
183 mdb->db_name, /* database name */
184 mdb->db_user, /* login name */
185 mdb->db_password); /* password */
187 /* If no connect, try once more in case it is a timing problem */
188 if (PQstatus(mdb->db) == CONNECTION_OK) {
194 Dmsg0(50, "pg_real_connect done\n");
195 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
196 mdb->db_password==NULL?"(NULL)":mdb->db_password);
198 if (PQstatus(mdb->db) != CONNECTION_OK) {
199 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
200 "Database=%s User=%s\n"
201 "It is probably not running or your password is incorrect.\n"),
202 mdb->db_name, mdb->db_user);
207 mdb->connected = true;
209 if (!check_tables_version(jcr, mdb)) {
214 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
216 /* tell PostgreSQL we are using standard conforming strings
217 and avoid warnings such as:
218 WARNING: nonstandard use of \\ in a string literal
220 sql_query(mdb, "set standard_conforming_strings=on");
227 db_close_database(JCR *jcr, B_DB *mdb)
232 db_end_transaction(jcr, mdb);
234 sql_free_result(mdb);
236 if (mdb->ref_count == 0) {
238 if (mdb->connected && mdb->db) {
241 rwl_destroy(&mdb->lock);
242 free_pool_memory(mdb->errmsg);
243 free_pool_memory(mdb->cmd);
244 free_pool_memory(mdb->cached_path);
245 free_pool_memory(mdb->fname);
246 free_pool_memory(mdb->path);
247 free_pool_memory(mdb->esc_name);
248 free_pool_memory(mdb->esc_path);
255 if (mdb->db_password) {
256 free(mdb->db_password);
258 if (mdb->db_address) {
259 free(mdb->db_address);
261 if (mdb->db_socket) {
262 free(mdb->db_socket);
269 void db_thread_cleanup()
273 * Return the next unique index (auto-increment) for
274 * the given table. Return NULL on error.
276 * For PostgreSQL, NULL causes the auto-increment value
279 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
281 strcpy(index, "NULL");
287 * Escape strings so that PostgreSQL is happy
289 * NOTE! len is the length of the old string. Your new
290 * string must be long enough (max 2*old+1) to hold
291 * the escaped output.
294 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
298 PQescapeStringConn(mdb->db, snew, old, len, &error);
300 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
301 /* error on encoding, probably invalid multibyte encoding in the source string
302 see PQescapeStringConn documentation for details. */
303 Dmsg0(500, "PQescapeStringConn failed\n");
308 * Submit a general SQL command (cmd), and for each row returned,
309 * the sqlite_handler is called with the ctx.
311 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
315 Dmsg0(500, "db_sql_query started\n");
318 if (sql_query(mdb, query) != 0) {
319 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
321 Dmsg0(500, "db_sql_query failed\n");
324 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
326 if (result_handler != NULL) {
327 Dmsg0(500, "db_sql_query invoking handler\n");
328 if ((mdb->result = sql_store_result(mdb)) != NULL) {
329 int num_fields = sql_num_fields(mdb);
331 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
332 while ((row = sql_fetch_row(mdb)) != NULL) {
334 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
335 if (result_handler(ctx, num_fields, row))
339 sql_free_result(mdb);
344 Dmsg0(500, "db_sql_query finished\n");
351 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
354 POSTGRESQL_ROW row = NULL; // by default, return NULL
356 Dmsg0(500, "my_postgresql_fetch_row start\n");
358 if (!mdb->row || mdb->row_size < mdb->num_fields) {
359 int num_fields = mdb->num_fields;
360 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
363 Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
366 num_fields += 20; /* add a bit extra */
367 mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
368 mdb->row_size = num_fields;
370 // now reset the row_number now that we have the space allocated
374 // if still within the result set
375 if (mdb->row_number < mdb->num_rows) {
376 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
377 // get each value from this row
378 for (j = 0; j < mdb->num_fields; j++) {
379 mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
380 Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
382 // increment the row number for the next call
387 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
390 Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
395 int my_postgresql_max_length(B_DB *mdb, int field_num) {
397 // for a given column, find the max length
404 for (i = 0; i < mdb->num_rows; i++) {
405 if (PQgetisnull(mdb->result, i, field_num)) {
406 this_length = 4; // "NULL"
408 this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
411 if (max_length < this_length) {
412 max_length = this_length;
419 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
423 Dmsg0(500, "my_postgresql_fetch_field starts\n");
425 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
429 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
430 mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
431 mdb->fields_size = mdb->num_fields;
433 for (i = 0; i < mdb->num_fields; i++) {
434 Dmsg1(500, "filling field %d\n", i);
435 mdb->fields[i].name = PQfname(mdb->result, i);
436 mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
437 mdb->fields[i].type = PQftype(mdb->result, i);
438 mdb->fields[i].flags = 0;
440 Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
441 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
442 mdb->fields[i].flags);
446 // increment field number for the next time around
448 Dmsg0(500, "my_postgresql_fetch_field finishes\n");
449 return &mdb->fields[mdb->field_number++];
452 void my_postgresql_data_seek(B_DB *mdb, int row)
454 // set the row number to be returned on the next call
455 // to my_postgresql_fetch_row
456 mdb->row_number = row;
459 void my_postgresql_field_seek(B_DB *mdb, int field)
461 mdb->field_number = field;
465 * Note, if this routine returns 1 (failure), Bacula expects
466 * that no result has been stored.
467 * This is where QUERY_DB comes with Postgresql.
469 * Returns: 0 on success
473 int my_postgresql_query(B_DB *mdb, const char *query)
475 Dmsg0(500, "my_postgresql_query started\n");
476 // We are starting a new query. reset everything.
478 mdb->row_number = -1;
479 mdb->field_number = -1;
482 PQclear(mdb->result); /* hmm, someone forgot to free?? */
486 Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
488 for (int i=0; i < 10; i++) {
489 mdb->result = PQexec(mdb->db, query);
496 Dmsg1(50, "Query failed: %s\n", query);
500 mdb->status = PQresultStatus(mdb->result);
501 if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
502 Dmsg1(500, "we have a result\n", query);
504 // how many fields in the set?
505 mdb->num_fields = (int)PQnfields(mdb->result);
506 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
508 mdb->num_rows = PQntuples(mdb->result);
509 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
511 mdb->status = 0; /* succeed */
513 Dmsg1(50, "Result status failed: %s\n", query);
517 Dmsg0(500, "my_postgresql_query finishing\n");
521 Dmsg1(500, "we failed\n", query);
522 PQclear(mdb->result);
524 mdb->status = 1; /* failed */
528 void my_postgresql_free_result(B_DB *mdb)
533 PQclear(mdb->result);
549 int my_postgresql_currval(B_DB *mdb, const char *table_name)
551 // Obtain the current value of the sequence that
552 // provides the serial value for primary key of the table.
554 // currval is local to our session. It is not affected by
555 // other transactions.
557 // Determine the name of the sequence.
558 // PostgreSQL automatically creates a sequence using
559 // <table>_<column>_seq.
560 // At the time of writing, all tables used this format for
561 // for their primary key: <table>id
562 // Except for basefiles which has a primary key on baseid.
563 // Therefore, we need to special case that one table.
565 // everything else can use the PostgreSQL formula.
567 char sequence[NAMEDATALEN-1];
568 char query [NAMEDATALEN+50];
572 if (strcasecmp(table_name, "basefiles") == 0) {
573 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
575 bstrncpy(sequence, table_name, sizeof(sequence));
576 bstrncat(sequence, "_", sizeof(sequence));
577 bstrncat(sequence, table_name, sizeof(sequence));
578 bstrncat(sequence, "id", sizeof(sequence));
581 bstrncat(sequence, "_seq", sizeof(sequence));
582 bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
584 Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
585 for (int i=0; i < 10; i++) {
586 result = PQexec(mdb->db, query);
593 Dmsg1(50, "Query failed: %s\n", query);
597 Dmsg0(500, "exec done");
599 if (PQresultStatus(result) == PGRES_TUPLES_OK) {
600 Dmsg0(500, "getting value");
601 id = atoi(PQgetvalue(result, 0, 0));
602 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
604 Dmsg1(50, "Result status failed: %s\n", query);
605 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
614 #ifdef HAVE_BATCH_FILE_INSERT
616 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
618 char *query = "COPY batch FROM STDIN";
620 Dmsg0(500, "my_postgresql_batch_start started\n");
622 if (my_postgresql_query(mdb,
623 "CREATE TEMPORARY TABLE batch ("
629 "md5 varchar)") == 1)
631 Dmsg0(500, "my_postgresql_batch_start failed\n");
635 // We are starting a new query. reset everything.
637 mdb->row_number = -1;
638 mdb->field_number = -1;
640 my_postgresql_free_result(mdb);
642 for (int i=0; i < 10; i++) {
643 mdb->result = PQexec(mdb->db, query);
650 Dmsg1(50, "Query failed: %s\n", query);
654 mdb->status = PQresultStatus(mdb->result);
655 if (mdb->status == PGRES_COPY_IN) {
656 // how many fields in the set?
657 mdb->num_fields = (int) PQnfields(mdb->result);
661 Dmsg1(50, "Result status failed: %s\n", query);
665 Dmsg0(500, "my_postgresql_batch_start finishing\n");
670 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
672 PQclear(mdb->result);
677 /* set error to something to abort operation */
678 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
682 Dmsg0(500, "my_postgresql_batch_end started\n");
684 if (!mdb) { /* no files ? */
689 res = PQputCopyEnd(mdb->db, error);
690 } while (res == 0 && --count > 0);
698 Dmsg0(500, "we failed\n");
700 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
703 Dmsg0(500, "my_postgresql_batch_end finishing\n");
708 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
716 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
717 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
719 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
720 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
722 if (ar->Digest == NULL || ar->Digest[0] == 0) {
728 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
729 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
730 mdb->esc_name, ar->attr, digest);
733 res = PQputCopyData(mdb->db,
736 } while (res == 0 && --count > 0);
745 Dmsg0(500, "we failed\n");
747 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
750 Dmsg0(500, "my_postgresql_batch_insert finishing\n");
755 #endif /* HAVE_BATCH_FILE_INSERT */
758 * Escape strings so that PostgreSQL is happy on COPY
760 * NOTE! len is the length of the old string. Your new
761 * string must be long enough (max 2*old+1) to hold
762 * the escaped output.
764 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
766 /* we have to escape \t, \n, \r, \ */
769 while (len > 0 && *src) {
804 #ifdef HAVE_BATCH_FILE_INSERT
805 const char *my_pg_batch_lock_path_query =
806 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
809 const char *my_pg_batch_lock_filename_query =
810 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
812 const char *my_pg_batch_unlock_tables_query = "COMMIT";
814 const char *my_pg_batch_fill_path_query =
815 "INSERT INTO Path (Path) "
816 "SELECT a.Path FROM "
817 "(SELECT DISTINCT Path FROM batch) AS a "
818 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
821 const char *my_pg_batch_fill_filename_query =
822 "INSERT INTO Filename (Name) "
823 "SELECT a.Name FROM "
824 "(SELECT DISTINCT Name FROM batch) as a "
826 "(SELECT Name FROM Filename WHERE Name = a.Name)";
827 #endif /* HAVE_BATCH_FILE_INSERT */
829 #endif /* HAVE_POSTGRESQL */