2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
38 /* The following is necessary so that we do not include
39 * the dummy external definition of DB.
41 #define __SQL_C /* indicate that this is sql.c */
46 #ifdef HAVE_POSTGRESQL
48 #include "postgres_ext.h" /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
51 /* -----------------------------------------------------------------------
53 * PostgreSQL dependent defines and subroutines
55 * -----------------------------------------------------------------------
58 /* List of open databases */
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
64 * Retrieve database type
74 * Initialize database data structure. In principal this should
75 * never have errors, or it is really fatal.
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79 const char *db_address, int db_port, const char *db_socket,
80 int mult_db_connections)
85 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
88 P(mutex); /* lock DB queue */
89 if (db_list == NULL) {
90 db_list = New(dlist(mdb, &mdb->link));
92 if (!mult_db_connections) {
93 /* Look to see if DB already open */
94 foreach_dlist(mdb, db_list) {
95 if (bstrcmp(mdb->db_name, db_name) &&
96 bstrcmp(mdb->db_address, db_address) &&
97 mdb->db_port == db_port) {
98 Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
101 return mdb; /* already open */
105 Dmsg0(100, "db_open first time\n");
106 mdb = (B_DB *)malloc(sizeof(B_DB));
107 memset(mdb, 0, sizeof(B_DB));
108 mdb->db_name = bstrdup(db_name);
109 mdb->db_user = bstrdup(db_user);
111 mdb->db_password = bstrdup(db_password);
114 mdb->db_address = bstrdup(db_address);
117 mdb->db_socket = bstrdup(db_socket);
119 mdb->db_port = db_port;
120 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
122 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
123 mdb->cached_path = get_pool_memory(PM_FNAME);
124 mdb->cached_path_id = 0;
126 mdb->fname = get_pool_memory(PM_FNAME);
127 mdb->path = get_pool_memory(PM_FNAME);
128 mdb->esc_name = get_pool_memory(PM_FNAME);
129 mdb->esc_path = get_pool_memory(PM_FNAME);
130 mdb->allow_transactions = mult_db_connections;
131 db_list->append(mdb); /* put db in list */
136 /* Check that the database correspond to the encoding we want */
137 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
142 if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
143 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
147 if ((row = sql_fetch_row(mdb)) == NULL) {
148 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
149 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
151 ret = bstrcmp(row[0], "SQL_ASCII");
154 /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
155 db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
157 } else { /* something is wrong with database encoding */
159 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
160 mdb->db_name, row[0]);
161 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
162 Dmsg1(50, "%s", mdb->errmsg);
169 * Now actually open the database. This can generate errors,
170 * which are returned in the errmsg
172 * DO NOT close the database or free(mdb) here !!!!
175 db_open_database(JCR *jcr, B_DB *mdb)
181 if (mdb->connected) {
185 mdb->connected = false;
187 if ((errstat=rwl_init(&mdb->lock)) != 0) {
189 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190 be.bstrerror(errstat));
196 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202 /* If connection fails, try at 5 sec intervals for 30 seconds. */
203 for (int retry=0; retry < 6; retry++) {
204 /* connect to the database */
205 mdb->db = PQsetdbLogin(
206 mdb->db_address, /* default = localhost */
207 port, /* default port */
208 NULL, /* pg options */
209 NULL, /* tty, ignored */
210 mdb->db_name, /* database name */
211 mdb->db_user, /* login name */
212 mdb->db_password); /* password */
214 /* If no connect, try once more in case it is a timing problem */
215 if (PQstatus(mdb->db) == CONNECTION_OK) {
221 Dmsg0(50, "pg_real_connect done\n");
222 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223 mdb->db_password==NULL?"(NULL)":mdb->db_password);
225 if (PQstatus(mdb->db) != CONNECTION_OK) {
226 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
227 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
228 mdb->db_name, mdb->db_user);
233 mdb->connected = true;
235 if (!check_tables_version(jcr, mdb)) {
240 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242 /* tell PostgreSQL we are using standard conforming strings
243 and avoid warnings such as:
244 WARNING: nonstandard use of \\ in a string literal
246 sql_query(mdb, "set standard_conforming_strings=on");
248 /* check that encoding is SQL_ASCII */
249 check_database_encoding(jcr, mdb);
256 db_close_database(JCR *jcr, B_DB *mdb)
261 db_end_transaction(jcr, mdb);
263 sql_free_result(mdb);
265 if (mdb->ref_count == 0) {
266 db_list->remove(mdb);
267 if (mdb->connected && mdb->db) {
270 rwl_destroy(&mdb->lock);
271 free_pool_memory(mdb->errmsg);
272 free_pool_memory(mdb->cmd);
273 free_pool_memory(mdb->cached_path);
274 free_pool_memory(mdb->fname);
275 free_pool_memory(mdb->path);
276 free_pool_memory(mdb->esc_name);
277 free_pool_memory(mdb->esc_path);
284 if (mdb->db_password) {
285 free(mdb->db_password);
287 if (mdb->db_address) {
288 free(mdb->db_address);
290 if (mdb->db_socket) {
291 free(mdb->db_socket);
294 PQfreemem(mdb->esc_obj);
297 if (db_list->size() == 0) {
305 void db_check_backend_thread_safe()
307 #ifdef HAVE_BATCH_FILE_INSERT
308 # ifdef HAVE_PQISTHREADSAFE
309 if (!PQisthreadsafe()) {
310 Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
311 "when using BatchMode.\n"));
317 void db_thread_cleanup()
321 * Return the next unique index (auto-increment) for
322 * the given table. Return NULL on error.
324 * For PostgreSQL, NULL causes the auto-increment value
327 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
329 strcpy(index, "NULL");
334 * Escape binary so that PostgreSQL is happy
338 db_escape_object(JCR *jcr, B_DB *mdb, char *old, int len)
342 PQfreemem(mdb->esc_obj);
345 mdb->esc_obj = PQescapeByteaConn(mdb->db, (unsigned const char *)old,
349 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
352 return (char *)mdb->esc_obj;
356 * Unescape binary object so that PostgreSQL is happy
360 db_unescape_object(JCR *jcr, B_DB *mdb,
361 char *from, int32_t expected_len,
362 POOLMEM **dest, int32_t *dest_len)
373 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
376 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
380 *dest = check_pool_memory_size(*dest, new_len+1);
381 memcpy(*dest, obj, new_len);
386 Dmsg1(010, "obj size: %d\n", *dest_len);
390 * Escape strings so that PostgreSQL is happy
392 * NOTE! len is the length of the old string. Your new
393 * string must be long enough (max 2*old+1) to hold
394 * the escaped output.
397 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
401 PQescapeStringConn(mdb->db, snew, old, len, &error);
403 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
404 /* error on encoding, probably invalid multibyte encoding in the source string
405 see PQescapeStringConn documentation for details. */
406 Dmsg0(500, "PQescapeStringConn failed\n");
411 * Submit a general SQL command (cmd), and for each row returned,
412 * the sqlite_handler is called with the ctx.
414 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
418 Dmsg0(500, "db_sql_query started\n");
421 if (sql_query(mdb, query) != 0) {
422 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
424 Dmsg0(500, "db_sql_query failed\n");
427 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
429 if (result_handler != NULL) {
430 Dmsg0(500, "db_sql_query invoking handler\n");
431 if ((mdb->result = sql_store_result(mdb)) != NULL) {
432 int num_fields = sql_num_fields(mdb);
434 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
435 while ((row = sql_fetch_row(mdb)) != NULL) {
437 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
438 if (result_handler(ctx, num_fields, row))
442 sql_free_result(mdb);
447 Dmsg0(500, "db_sql_query finished\n");
454 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
457 POSTGRESQL_ROW row = NULL; // by default, return NULL
459 Dmsg0(500, "my_postgresql_fetch_row start\n");
461 if (!mdb->row || mdb->row_size < mdb->num_fields) {
462 int num_fields = mdb->num_fields;
463 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
466 Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
469 num_fields += 20; /* add a bit extra */
470 mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
471 mdb->row_size = num_fields;
473 // now reset the row_number now that we have the space allocated
477 // if still within the result set
478 if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
479 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
480 // get each value from this row
481 for (j = 0; j < mdb->num_fields; j++) {
482 mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
483 Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
485 // increment the row number for the next call
490 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
493 Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
498 int my_postgresql_max_length(B_DB *mdb, int field_num) {
500 // for a given column, find the max length
507 for (i = 0; i < mdb->num_rows; i++) {
508 if (PQgetisnull(mdb->result, i, field_num)) {
509 this_length = 4; // "NULL"
511 this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
514 if (max_length < this_length) {
515 max_length = this_length;
522 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
526 Dmsg0(500, "my_postgresql_fetch_field starts\n");
528 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
532 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
533 mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
534 mdb->fields_size = mdb->num_fields;
536 for (i = 0; i < mdb->num_fields; i++) {
537 Dmsg1(500, "filling field %d\n", i);
538 mdb->fields[i].name = PQfname(mdb->result, i);
539 mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
540 mdb->fields[i].type = PQftype(mdb->result, i);
541 mdb->fields[i].flags = 0;
543 Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
544 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
545 mdb->fields[i].flags);
549 // increment field number for the next time around
551 Dmsg0(500, "my_postgresql_fetch_field finishes\n");
552 return &mdb->fields[mdb->field_number++];
555 void my_postgresql_data_seek(B_DB *mdb, int row)
557 // set the row number to be returned on the next call
558 // to my_postgresql_fetch_row
559 mdb->row_number = row;
562 void my_postgresql_field_seek(B_DB *mdb, int field)
564 mdb->field_number = field;
568 * Note, if this routine returns 1 (failure), Bacula expects
569 * that no result has been stored.
570 * This is where QUERY_DB comes with Postgresql.
572 * Returns: 0 on success
576 int my_postgresql_query(B_DB *mdb, const char *query)
578 Dmsg0(500, "my_postgresql_query started\n");
579 // We are starting a new query. reset everything.
581 mdb->row_number = -1;
582 mdb->field_number = -1;
585 PQclear(mdb->result); /* hmm, someone forgot to free?? */
589 Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
591 for (int i=0; i < 10; i++) {
592 mdb->result = PQexec(mdb->db, query);
599 Dmsg1(50, "Query failed: %s\n", query);
603 mdb->status = PQresultStatus(mdb->result);
604 if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
605 Dmsg1(500, "we have a result\n", query);
607 // how many fields in the set?
608 mdb->num_fields = (int)PQnfields(mdb->result);
609 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
611 mdb->num_rows = PQntuples(mdb->result);
612 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
614 mdb->row_number = 0; /* we can start to fetch something */
615 mdb->status = 0; /* succeed */
617 Dmsg1(50, "Result status failed: %s\n", query);
621 Dmsg0(500, "my_postgresql_query finishing\n");
625 Dmsg1(500, "we failed\n", query);
626 PQclear(mdb->result);
628 mdb->status = 1; /* failed */
632 void my_postgresql_free_result(B_DB *mdb)
637 PQclear(mdb->result);
653 static int my_postgresql_currval(B_DB *mdb, const char *table_name)
655 // Obtain the current value of the sequence that
656 // provides the serial value for primary key of the table.
658 // currval is local to our session. It is not affected by
659 // other transactions.
661 // Determine the name of the sequence.
662 // PostgreSQL automatically creates a sequence using
663 // <table>_<column>_seq.
664 // At the time of writing, all tables used this format for
665 // for their primary key: <table>id
666 // Except for basefiles which has a primary key on baseid.
667 // Therefore, we need to special case that one table.
669 // everything else can use the PostgreSQL formula.
671 char sequence[NAMEDATALEN-1];
672 char query [NAMEDATALEN+50];
676 if (strcasecmp(table_name, "basefiles") == 0) {
677 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
679 bstrncpy(sequence, table_name, sizeof(sequence));
680 bstrncat(sequence, "_", sizeof(sequence));
681 bstrncat(sequence, table_name, sizeof(sequence));
682 bstrncat(sequence, "id", sizeof(sequence));
685 bstrncat(sequence, "_seq", sizeof(sequence));
686 bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
688 Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
689 for (int i=0; i < 10; i++) {
690 result = PQexec(mdb->db, query);
697 Dmsg1(50, "Query failed: %s\n", query);
701 Dmsg0(500, "exec done");
703 if (PQresultStatus(result) == PGRES_TUPLES_OK) {
704 Dmsg0(500, "getting value");
705 id = atoi(PQgetvalue(result, 0, 0));
706 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
708 Dmsg1(50, "Result status failed: %s\n", query);
709 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
718 int my_postgresql_insert_autokey_record(B_DB *mdb, const char *query, const char *table_name)
721 * First execute the insert query and then retrieve the currval.
723 if (my_postgresql_query(mdb, query)) {
727 mdb->num_rows = sql_affected_rows(mdb);
728 if (mdb->num_rows != 1) {
734 return my_postgresql_currval(mdb, table_name);
737 #ifdef HAVE_BATCH_FILE_INSERT
739 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
741 const char *query = "COPY batch FROM STDIN";
743 Dmsg0(500, "my_postgresql_batch_start started\n");
745 if (my_postgresql_query(mdb,
746 "CREATE TEMPORARY TABLE batch ("
752 "md5 varchar)") == 1)
754 Dmsg0(500, "my_postgresql_batch_start failed\n");
758 // We are starting a new query. reset everything.
760 mdb->row_number = -1;
761 mdb->field_number = -1;
763 my_postgresql_free_result(mdb);
765 for (int i=0; i < 10; i++) {
766 mdb->result = PQexec(mdb->db, query);
773 Dmsg1(50, "Query failed: %s\n", query);
777 mdb->status = PQresultStatus(mdb->result);
778 if (mdb->status == PGRES_COPY_IN) {
779 // how many fields in the set?
780 mdb->num_fields = (int) PQnfields(mdb->result);
784 Dmsg1(50, "Result status failed: %s\n", query);
788 Dmsg0(500, "my_postgresql_batch_start finishing\n");
793 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
795 PQclear(mdb->result);
800 /* set error to something to abort operation */
801 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
806 Dmsg0(500, "my_postgresql_batch_end started\n");
808 if (!mdb) { /* no files ? */
813 res = PQputCopyEnd(mdb->db, error);
814 } while (res == 0 && --count > 0);
822 Dmsg0(500, "we failed\n");
824 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
827 /* Check command status and return to normal libpq state */
828 result = PQgetResult(mdb->db);
829 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
830 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
835 Dmsg0(500, "my_postgresql_batch_end finishing\n");
840 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
848 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
849 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
851 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
852 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
854 if (ar->Digest == NULL || ar->Digest[0] == 0) {
860 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
861 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
862 mdb->esc_name, ar->attr, digest);
865 res = PQputCopyData(mdb->db,
868 } while (res == 0 && --count > 0);
877 Dmsg0(500, "we failed\n");
879 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
882 Dmsg0(500, "my_postgresql_batch_insert finishing\n");
887 #endif /* HAVE_BATCH_FILE_INSERT */
890 * Escape strings so that PostgreSQL is happy on COPY
892 * NOTE! len is the length of the old string. Your new
893 * string must be long enough (max 2*old+1) to hold
894 * the escaped output.
896 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
898 /* we have to escape \t, \n, \r, \ */
901 while (len > 0 && *src) {
936 #ifdef HAVE_BATCH_FILE_INSERT
937 const char *my_pg_batch_lock_path_query =
938 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
941 const char *my_pg_batch_lock_filename_query =
942 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
944 const char *my_pg_batch_unlock_tables_query = "COMMIT";
946 const char *my_pg_batch_fill_path_query =
947 "INSERT INTO Path (Path) "
948 "SELECT a.Path FROM "
949 "(SELECT DISTINCT Path FROM batch) AS a "
950 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
953 const char *my_pg_batch_fill_filename_query =
954 "INSERT INTO Filename (Name) "
955 "SELECT a.Name FROM "
956 "(SELECT DISTINCT Name FROM batch) as a "
958 "(SELECT Name FROM Filename WHERE Name = a.Name)";
959 #endif /* HAVE_BATCH_FILE_INSERT */
961 #endif /* HAVE_POSTGRESQL */