2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
38 /* The following is necessary so that we do not include
39 * the dummy external definition of DB.
41 #define __SQL_C /* indicate that this is sql.c */
46 #ifdef HAVE_POSTGRESQL
48 #include "postgres_ext.h" /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
51 /* -----------------------------------------------------------------------
53 * PostgreSQL dependent defines and subroutines
55 * -----------------------------------------------------------------------
58 /* List of open databases */
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
64 * Retrieve database type
74 * Initialize database data structure. In principal this should
75 * never have errors, or it is really fatal.
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79 const char *db_address, int db_port, const char *db_socket,
80 int mult_db_connections)
85 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
88 P(mutex); /* lock DB queue */
89 if (db_list == NULL) {
90 db_list = New(dlist(mdb, &mdb->link));
92 if (!mult_db_connections) {
93 /* Look to see if DB already open */
94 foreach_dlist(mdb, db_list) {
95 if (bstrcmp(mdb->db_name, db_name) &&
96 bstrcmp(mdb->db_address, db_address) &&
97 mdb->db_port == db_port) {
98 Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
101 return mdb; /* already open */
105 Dmsg0(100, "db_open first time\n");
106 mdb = (B_DB *)malloc(sizeof(B_DB));
107 memset(mdb, 0, sizeof(B_DB));
108 mdb->db_name = bstrdup(db_name);
109 mdb->db_user = bstrdup(db_user);
111 mdb->db_password = bstrdup(db_password);
114 mdb->db_address = bstrdup(db_address);
117 mdb->db_socket = bstrdup(db_socket);
119 mdb->db_port = db_port;
120 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
122 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
123 mdb->cached_path = get_pool_memory(PM_FNAME);
124 mdb->cached_path_id = 0;
126 mdb->fname = get_pool_memory(PM_FNAME);
127 mdb->path = get_pool_memory(PM_FNAME);
128 mdb->esc_name = get_pool_memory(PM_FNAME);
129 mdb->esc_path = get_pool_memory(PM_FNAME);
130 mdb->allow_transactions = mult_db_connections;
131 db_list->append(mdb); /* put db in list */
136 /* Check that the database correspond to the encoding we want */
137 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
142 if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
143 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
147 if ((row = sql_fetch_row(mdb)) == NULL) {
148 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
149 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
151 ret = bstrcmp(row[0], "SQL_ASCII");
154 /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
155 db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
157 } else { /* something is wrong with database encoding */
159 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
160 mdb->db_name, row[0]);
161 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
162 Dmsg1(50, "%s", mdb->errmsg);
169 * Now actually open the database. This can generate errors,
170 * which are returned in the errmsg
172 * DO NOT close the database or free(mdb) here !!!!
175 db_open_database(JCR *jcr, B_DB *mdb)
181 if (mdb->connected) {
185 mdb->connected = false;
187 if ((errstat=rwl_init(&mdb->lock)) != 0) {
189 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190 be.bstrerror(errstat));
196 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202 /* If connection fails, try at 5 sec intervals for 30 seconds. */
203 for (int retry=0; retry < 6; retry++) {
204 /* connect to the database */
205 mdb->db = PQsetdbLogin(
206 mdb->db_address, /* default = localhost */
207 port, /* default port */
208 NULL, /* pg options */
209 NULL, /* tty, ignored */
210 mdb->db_name, /* database name */
211 mdb->db_user, /* login name */
212 mdb->db_password); /* password */
214 /* If no connect, try once more in case it is a timing problem */
215 if (PQstatus(mdb->db) == CONNECTION_OK) {
221 Dmsg0(50, "pg_real_connect done\n");
222 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223 mdb->db_password==NULL?"(NULL)":mdb->db_password);
225 if (PQstatus(mdb->db) != CONNECTION_OK) {
226 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
227 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
228 mdb->db_name, mdb->db_user);
233 mdb->connected = true;
235 if (!check_tables_version(jcr, mdb)) {
240 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242 /* tell PostgreSQL we are using standard conforming strings
243 and avoid warnings such as:
244 WARNING: nonstandard use of \\ in a string literal
246 sql_query(mdb, "set standard_conforming_strings=on");
248 /* check that encoding is SQL_ASCII */
249 check_database_encoding(jcr, mdb);
256 db_close_database(JCR *jcr, B_DB *mdb)
261 db_end_transaction(jcr, mdb);
263 sql_free_result(mdb);
265 if (mdb->ref_count == 0) {
266 db_list->remove(mdb);
267 if (mdb->connected && mdb->db) {
270 rwl_destroy(&mdb->lock);
271 free_pool_memory(mdb->errmsg);
272 free_pool_memory(mdb->cmd);
273 free_pool_memory(mdb->cached_path);
274 free_pool_memory(mdb->fname);
275 free_pool_memory(mdb->path);
276 free_pool_memory(mdb->esc_name);
277 free_pool_memory(mdb->esc_path);
284 if (mdb->db_password) {
285 free(mdb->db_password);
287 if (mdb->db_address) {
288 free(mdb->db_address);
290 if (mdb->db_socket) {
291 free(mdb->db_socket);
294 if (db_list->size() == 0) {
302 void db_check_backend_thread_safe()
304 #ifdef HAVE_BATCH_FILE_INSERT
305 # ifdef HAVE_PQISTHREADSAFE
306 if (!PQisthreadsafe()) {
307 Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
308 "when using BatchMode.\n"));
314 void db_thread_cleanup()
318 * Return the next unique index (auto-increment) for
319 * the given table. Return NULL on error.
321 * For PostgreSQL, NULL causes the auto-increment value
324 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
326 strcpy(index, "NULL");
332 * Escape strings so that PostgreSQL is happy
334 * NOTE! len is the length of the old string. Your new
335 * string must be long enough (max 2*old+1) to hold
336 * the escaped output.
339 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
343 PQescapeStringConn(mdb->db, snew, old, len, &error);
345 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
346 /* error on encoding, probably invalid multibyte encoding in the source string
347 see PQescapeStringConn documentation for details. */
348 Dmsg0(500, "PQescapeStringConn failed\n");
353 * Submit a general SQL command (cmd), and for each row returned,
354 * the sqlite_handler is called with the ctx.
356 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
360 Dmsg0(500, "db_sql_query started\n");
363 if (sql_query(mdb, query) != 0) {
364 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
366 Dmsg0(500, "db_sql_query failed\n");
369 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
371 if (result_handler != NULL) {
372 Dmsg0(500, "db_sql_query invoking handler\n");
373 if ((mdb->result = sql_store_result(mdb)) != NULL) {
374 int num_fields = sql_num_fields(mdb);
376 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
377 while ((row = sql_fetch_row(mdb)) != NULL) {
379 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
380 if (result_handler(ctx, num_fields, row))
384 sql_free_result(mdb);
389 Dmsg0(500, "db_sql_query finished\n");
396 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
399 POSTGRESQL_ROW row = NULL; // by default, return NULL
401 Dmsg0(500, "my_postgresql_fetch_row start\n");
403 if (!mdb->row || mdb->row_size < mdb->num_fields) {
404 int num_fields = mdb->num_fields;
405 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
408 Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
411 num_fields += 20; /* add a bit extra */
412 mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
413 mdb->row_size = num_fields;
415 // now reset the row_number now that we have the space allocated
419 // if still within the result set
420 if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
421 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
422 // get each value from this row
423 for (j = 0; j < mdb->num_fields; j++) {
424 mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
425 Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
427 // increment the row number for the next call
432 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
435 Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
440 int my_postgresql_max_length(B_DB *mdb, int field_num) {
442 // for a given column, find the max length
449 for (i = 0; i < mdb->num_rows; i++) {
450 if (PQgetisnull(mdb->result, i, field_num)) {
451 this_length = 4; // "NULL"
453 this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
456 if (max_length < this_length) {
457 max_length = this_length;
464 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
468 Dmsg0(500, "my_postgresql_fetch_field starts\n");
470 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
474 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
475 mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
476 mdb->fields_size = mdb->num_fields;
478 for (i = 0; i < mdb->num_fields; i++) {
479 Dmsg1(500, "filling field %d\n", i);
480 mdb->fields[i].name = PQfname(mdb->result, i);
481 mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
482 mdb->fields[i].type = PQftype(mdb->result, i);
483 mdb->fields[i].flags = 0;
485 Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
486 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
487 mdb->fields[i].flags);
491 // increment field number for the next time around
493 Dmsg0(500, "my_postgresql_fetch_field finishes\n");
494 return &mdb->fields[mdb->field_number++];
497 void my_postgresql_data_seek(B_DB *mdb, int row)
499 // set the row number to be returned on the next call
500 // to my_postgresql_fetch_row
501 mdb->row_number = row;
504 void my_postgresql_field_seek(B_DB *mdb, int field)
506 mdb->field_number = field;
510 * Note, if this routine returns 1 (failure), Bacula expects
511 * that no result has been stored.
512 * This is where QUERY_DB comes with Postgresql.
514 * Returns: 0 on success
518 int my_postgresql_query(B_DB *mdb, const char *query)
520 Dmsg0(500, "my_postgresql_query started\n");
521 // We are starting a new query. reset everything.
523 mdb->row_number = -1;
524 mdb->field_number = -1;
527 PQclear(mdb->result); /* hmm, someone forgot to free?? */
531 Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
533 for (int i=0; i < 10; i++) {
534 mdb->result = PQexec(mdb->db, query);
541 Dmsg1(50, "Query failed: %s\n", query);
545 mdb->status = PQresultStatus(mdb->result);
546 if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
547 Dmsg1(500, "we have a result\n", query);
549 // how many fields in the set?
550 mdb->num_fields = (int)PQnfields(mdb->result);
551 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
553 mdb->num_rows = PQntuples(mdb->result);
554 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
556 mdb->row_number = 0; /* we can start to fetch something */
557 mdb->status = 0; /* succeed */
559 Dmsg1(50, "Result status failed: %s\n", query);
563 Dmsg0(500, "my_postgresql_query finishing\n");
567 Dmsg1(500, "we failed\n", query);
568 PQclear(mdb->result);
570 mdb->status = 1; /* failed */
574 void my_postgresql_free_result(B_DB *mdb)
579 PQclear(mdb->result);
595 static int my_postgresql_currval(B_DB *mdb, const char *table_name)
597 // Obtain the current value of the sequence that
598 // provides the serial value for primary key of the table.
600 // currval is local to our session. It is not affected by
601 // other transactions.
603 // Determine the name of the sequence.
604 // PostgreSQL automatically creates a sequence using
605 // <table>_<column>_seq.
606 // At the time of writing, all tables used this format for
607 // for their primary key: <table>id
608 // Except for basefiles which has a primary key on baseid.
609 // Therefore, we need to special case that one table.
611 // everything else can use the PostgreSQL formula.
613 char sequence[NAMEDATALEN-1];
614 char query [NAMEDATALEN+50];
618 if (strcasecmp(table_name, "basefiles") == 0) {
619 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
621 bstrncpy(sequence, table_name, sizeof(sequence));
622 bstrncat(sequence, "_", sizeof(sequence));
623 bstrncat(sequence, table_name, sizeof(sequence));
624 bstrncat(sequence, "id", sizeof(sequence));
627 bstrncat(sequence, "_seq", sizeof(sequence));
628 bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
630 Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
631 for (int i=0; i < 10; i++) {
632 result = PQexec(mdb->db, query);
639 Dmsg1(50, "Query failed: %s\n", query);
643 Dmsg0(500, "exec done");
645 if (PQresultStatus(result) == PGRES_TUPLES_OK) {
646 Dmsg0(500, "getting value");
647 id = atoi(PQgetvalue(result, 0, 0));
648 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
650 Dmsg1(50, "Result status failed: %s\n", query);
651 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
660 int my_postgresql_insert_id(B_DB *mdb, const char *query, const char *table_name)
663 * First execute the insert query and then retrieve the currval.
665 if (my_postgresql_query(mdb, query)) {
669 mdb->num_rows = sql_affected_rows(mdb);
670 if (mdb->num_rows != 1) {
676 return my_postgresql_currval(mdb, table_name);
679 #ifdef HAVE_BATCH_FILE_INSERT
681 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
683 const char *query = "COPY batch FROM STDIN";
685 Dmsg0(500, "my_postgresql_batch_start started\n");
687 if (my_postgresql_query(mdb,
688 "CREATE TEMPORARY TABLE batch ("
694 "md5 varchar)") == 1)
696 Dmsg0(500, "my_postgresql_batch_start failed\n");
700 // We are starting a new query. reset everything.
702 mdb->row_number = -1;
703 mdb->field_number = -1;
705 my_postgresql_free_result(mdb);
707 for (int i=0; i < 10; i++) {
708 mdb->result = PQexec(mdb->db, query);
715 Dmsg1(50, "Query failed: %s\n", query);
719 mdb->status = PQresultStatus(mdb->result);
720 if (mdb->status == PGRES_COPY_IN) {
721 // how many fields in the set?
722 mdb->num_fields = (int) PQnfields(mdb->result);
726 Dmsg1(50, "Result status failed: %s\n", query);
730 Dmsg0(500, "my_postgresql_batch_start finishing\n");
735 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
737 PQclear(mdb->result);
742 /* set error to something to abort operation */
743 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
748 Dmsg0(500, "my_postgresql_batch_end started\n");
750 if (!mdb) { /* no files ? */
755 res = PQputCopyEnd(mdb->db, error);
756 } while (res == 0 && --count > 0);
764 Dmsg0(500, "we failed\n");
766 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
769 /* Check command status and return to normal libpq state */
770 result = PQgetResult(mdb->db);
771 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
772 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
777 Dmsg0(500, "my_postgresql_batch_end finishing\n");
782 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
790 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
791 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
793 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
794 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
796 if (ar->Digest == NULL || ar->Digest[0] == 0) {
802 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
803 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
804 mdb->esc_name, ar->attr, digest);
807 res = PQputCopyData(mdb->db,
810 } while (res == 0 && --count > 0);
819 Dmsg0(500, "we failed\n");
821 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
824 Dmsg0(500, "my_postgresql_batch_insert finishing\n");
829 #endif /* HAVE_BATCH_FILE_INSERT */
832 * Escape strings so that PostgreSQL is happy on COPY
834 * NOTE! len is the length of the old string. Your new
835 * string must be long enough (max 2*old+1) to hold
836 * the escaped output.
838 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
840 /* we have to escape \t, \n, \r, \ */
843 while (len > 0 && *src) {
878 #ifdef HAVE_BATCH_FILE_INSERT
879 const char *my_pg_batch_lock_path_query =
880 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
883 const char *my_pg_batch_lock_filename_query =
884 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
886 const char *my_pg_batch_unlock_tables_query = "COMMIT";
888 const char *my_pg_batch_fill_path_query =
889 "INSERT INTO Path (Path) "
890 "SELECT a.Path FROM "
891 "(SELECT DISTINCT Path FROM batch) AS a "
892 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
895 const char *my_pg_batch_fill_filename_query =
896 "INSERT INTO Filename (Name) "
897 "SELECT a.Name FROM "
898 "(SELECT DISTINCT Name FROM batch) as a "
900 "(SELECT Name FROM Filename WHERE Name = a.Name)";
901 #endif /* HAVE_BATCH_FILE_INSERT */
903 #endif /* HAVE_POSTGRESQL */