2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
38 /* The following is necessary so that we do not include
39 * the dummy external definition of DB.
41 #define __SQL_C /* indicate that this is sql.c */
46 #ifdef HAVE_POSTGRESQL
48 #include "postgres_ext.h" /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
51 /* -----------------------------------------------------------------------
53 * PostgreSQL dependent defines and subroutines
55 * -----------------------------------------------------------------------
58 /* List of open databases */
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
64 * Retrieve database type
74 * Initialize database data structure. In principal this should
75 * never have errors, or it is really fatal.
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79 const char *db_address, int db_port, const char *db_socket,
80 int mult_db_connections)
85 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
88 P(mutex); /* lock DB queue */
89 if (db_list == NULL) {
90 db_list = New(dlist(mdb, &mdb->link));
92 if (!mult_db_connections) {
93 /* Look to see if DB already open */
94 foreach_dlist(mdb, db_list) {
95 if (bstrcmp(mdb->db_name, db_name) &&
96 bstrcmp(mdb->db_address, db_address) &&
97 mdb->db_port == db_port) {
98 Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
101 return mdb; /* already open */
105 Dmsg0(100, "db_open first time\n");
106 mdb = (B_DB *)malloc(sizeof(B_DB));
107 memset(mdb, 0, sizeof(B_DB));
108 mdb->db_name = bstrdup(db_name);
109 mdb->db_user = bstrdup(db_user);
111 mdb->db_password = bstrdup(db_password);
114 mdb->db_address = bstrdup(db_address);
117 mdb->db_socket = bstrdup(db_socket);
119 mdb->db_port = db_port;
120 mdb->have_insert_id = TRUE;
121 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
123 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
124 mdb->cached_path = get_pool_memory(PM_FNAME);
125 mdb->cached_path_id = 0;
127 mdb->fname = get_pool_memory(PM_FNAME);
128 mdb->path = get_pool_memory(PM_FNAME);
129 mdb->esc_name = get_pool_memory(PM_FNAME);
130 mdb->esc_path = get_pool_memory(PM_FNAME);
131 mdb->allow_transactions = mult_db_connections;
132 db_list->append(mdb); /* put db in list */
137 /* Check that the database correspond to the encoding we want */
138 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
143 if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
144 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
148 if ((row = sql_fetch_row(mdb)) == NULL) {
149 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
150 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
152 ret = bstrcmp(row[0], "SQL_ASCII");
155 /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
156 db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
158 } else { /* something is wrong with database encoding */
160 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
161 mdb->db_name, row[0]);
162 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
163 Dmsg1(50, "%s", mdb->errmsg);
170 * Now actually open the database. This can generate errors,
171 * which are returned in the errmsg
173 * DO NOT close the database or free(mdb) here !!!!
176 db_open_database(JCR *jcr, B_DB *mdb)
182 if (mdb->connected) {
186 mdb->connected = false;
188 if ((errstat=rwl_init(&mdb->lock)) != 0) {
190 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
191 be.bstrerror(errstat));
197 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
203 /* If connection fails, try at 5 sec intervals for 30 seconds. */
204 for (int retry=0; retry < 6; retry++) {
205 /* connect to the database */
206 mdb->db = PQsetdbLogin(
207 mdb->db_address, /* default = localhost */
208 port, /* default port */
209 NULL, /* pg options */
210 NULL, /* tty, ignored */
211 mdb->db_name, /* database name */
212 mdb->db_user, /* login name */
213 mdb->db_password); /* password */
215 /* If no connect, try once more in case it is a timing problem */
216 if (PQstatus(mdb->db) == CONNECTION_OK) {
222 Dmsg0(50, "pg_real_connect done\n");
223 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
224 mdb->db_password==NULL?"(NULL)":mdb->db_password);
226 if (PQstatus(mdb->db) != CONNECTION_OK) {
227 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
228 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
229 mdb->db_name, mdb->db_user);
234 mdb->connected = true;
236 if (!check_tables_version(jcr, mdb)) {
241 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
243 /* tell PostgreSQL we are using standard conforming strings
244 and avoid warnings such as:
245 WARNING: nonstandard use of \\ in a string literal
247 sql_query(mdb, "set standard_conforming_strings=on");
249 /* check that encoding is SQL_ASCII */
250 check_database_encoding(jcr, mdb);
257 db_close_database(JCR *jcr, B_DB *mdb)
262 db_end_transaction(jcr, mdb);
264 sql_free_result(mdb);
266 if (mdb->ref_count == 0) {
267 db_list->remove(mdb);
268 if (mdb->connected && mdb->db) {
271 rwl_destroy(&mdb->lock);
272 free_pool_memory(mdb->errmsg);
273 free_pool_memory(mdb->cmd);
274 free_pool_memory(mdb->cached_path);
275 free_pool_memory(mdb->fname);
276 free_pool_memory(mdb->path);
277 free_pool_memory(mdb->esc_name);
278 free_pool_memory(mdb->esc_path);
285 if (mdb->db_password) {
286 free(mdb->db_password);
288 if (mdb->db_address) {
289 free(mdb->db_address);
291 if (mdb->db_socket) {
292 free(mdb->db_socket);
299 void db_check_backend_thread_safe()
301 #ifdef HAVE_BATCH_FILE_INSERT
302 # ifdef HAVE_PQISTHREADSAFE
303 if (!PQisthreadsafe()) {
304 Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
305 "when using BatchMode.\n"));
311 void db_thread_cleanup()
315 * Return the next unique index (auto-increment) for
316 * the given table. Return NULL on error.
318 * For PostgreSQL, NULL causes the auto-increment value
321 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
323 strcpy(index, "NULL");
329 * Escape strings so that PostgreSQL is happy
331 * NOTE! len is the length of the old string. Your new
332 * string must be long enough (max 2*old+1) to hold
333 * the escaped output.
336 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
340 PQescapeStringConn(mdb->db, snew, old, len, &error);
342 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
343 /* error on encoding, probably invalid multibyte encoding in the source string
344 see PQescapeStringConn documentation for details. */
345 Dmsg0(500, "PQescapeStringConn failed\n");
350 * Submit a general SQL command (cmd), and for each row returned,
351 * the sqlite_handler is called with the ctx.
353 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
357 Dmsg0(500, "db_sql_query started\n");
360 if (sql_query(mdb, query) != 0) {
361 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
363 Dmsg0(500, "db_sql_query failed\n");
366 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
368 if (result_handler != NULL) {
369 Dmsg0(500, "db_sql_query invoking handler\n");
370 if ((mdb->result = sql_store_result(mdb)) != NULL) {
371 int num_fields = sql_num_fields(mdb);
373 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
374 while ((row = sql_fetch_row(mdb)) != NULL) {
376 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
377 if (result_handler(ctx, num_fields, row))
381 sql_free_result(mdb);
386 Dmsg0(500, "db_sql_query finished\n");
393 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
396 POSTGRESQL_ROW row = NULL; // by default, return NULL
398 Dmsg0(500, "my_postgresql_fetch_row start\n");
400 if (!mdb->row || mdb->row_size < mdb->num_fields) {
401 int num_fields = mdb->num_fields;
402 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
405 Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
408 num_fields += 20; /* add a bit extra */
409 mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
410 mdb->row_size = num_fields;
412 // now reset the row_number now that we have the space allocated
416 // if still within the result set
417 if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
418 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
419 // get each value from this row
420 for (j = 0; j < mdb->num_fields; j++) {
421 mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
422 Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
424 // increment the row number for the next call
429 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
432 Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
437 int my_postgresql_max_length(B_DB *mdb, int field_num) {
439 // for a given column, find the max length
446 for (i = 0; i < mdb->num_rows; i++) {
447 if (PQgetisnull(mdb->result, i, field_num)) {
448 this_length = 4; // "NULL"
450 this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
453 if (max_length < this_length) {
454 max_length = this_length;
461 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
465 Dmsg0(500, "my_postgresql_fetch_field starts\n");
467 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
471 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
472 mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
473 mdb->fields_size = mdb->num_fields;
475 for (i = 0; i < mdb->num_fields; i++) {
476 Dmsg1(500, "filling field %d\n", i);
477 mdb->fields[i].name = PQfname(mdb->result, i);
478 mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
479 mdb->fields[i].type = PQftype(mdb->result, i);
480 mdb->fields[i].flags = 0;
482 Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
483 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
484 mdb->fields[i].flags);
488 // increment field number for the next time around
490 Dmsg0(500, "my_postgresql_fetch_field finishes\n");
491 return &mdb->fields[mdb->field_number++];
494 void my_postgresql_data_seek(B_DB *mdb, int row)
496 // set the row number to be returned on the next call
497 // to my_postgresql_fetch_row
498 mdb->row_number = row;
501 void my_postgresql_field_seek(B_DB *mdb, int field)
503 mdb->field_number = field;
507 * Note, if this routine returns 1 (failure), Bacula expects
508 * that no result has been stored.
509 * This is where QUERY_DB comes with Postgresql.
511 * Returns: 0 on success
515 int my_postgresql_query(B_DB *mdb, const char *query)
517 Dmsg0(500, "my_postgresql_query started\n");
518 // We are starting a new query. reset everything.
520 mdb->row_number = -1;
521 mdb->field_number = -1;
524 PQclear(mdb->result); /* hmm, someone forgot to free?? */
528 Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
530 for (int i=0; i < 10; i++) {
531 mdb->result = PQexec(mdb->db, query);
538 Dmsg1(50, "Query failed: %s\n", query);
542 mdb->status = PQresultStatus(mdb->result);
543 if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
544 Dmsg1(500, "we have a result\n", query);
546 // how many fields in the set?
547 mdb->num_fields = (int)PQnfields(mdb->result);
548 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
550 mdb->num_rows = PQntuples(mdb->result);
551 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
553 mdb->row_number = 0; /* we can start to fetch something */
554 mdb->status = 0; /* succeed */
556 Dmsg1(50, "Result status failed: %s\n", query);
560 Dmsg0(500, "my_postgresql_query finishing\n");
564 Dmsg1(500, "we failed\n", query);
565 PQclear(mdb->result);
567 mdb->status = 1; /* failed */
571 void my_postgresql_free_result(B_DB *mdb)
576 PQclear(mdb->result);
592 int my_postgresql_currval(B_DB *mdb, const char *table_name)
594 // Obtain the current value of the sequence that
595 // provides the serial value for primary key of the table.
597 // currval is local to our session. It is not affected by
598 // other transactions.
600 // Determine the name of the sequence.
601 // PostgreSQL automatically creates a sequence using
602 // <table>_<column>_seq.
603 // At the time of writing, all tables used this format for
604 // for their primary key: <table>id
605 // Except for basefiles which has a primary key on baseid.
606 // Therefore, we need to special case that one table.
608 // everything else can use the PostgreSQL formula.
610 char sequence[NAMEDATALEN-1];
611 char query [NAMEDATALEN+50];
615 if (strcasecmp(table_name, "basefiles") == 0) {
616 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
618 bstrncpy(sequence, table_name, sizeof(sequence));
619 bstrncat(sequence, "_", sizeof(sequence));
620 bstrncat(sequence, table_name, sizeof(sequence));
621 bstrncat(sequence, "id", sizeof(sequence));
624 bstrncat(sequence, "_seq", sizeof(sequence));
625 bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
627 Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
628 for (int i=0; i < 10; i++) {
629 result = PQexec(mdb->db, query);
636 Dmsg1(50, "Query failed: %s\n", query);
640 Dmsg0(500, "exec done");
642 if (PQresultStatus(result) == PGRES_TUPLES_OK) {
643 Dmsg0(500, "getting value");
644 id = atoi(PQgetvalue(result, 0, 0));
645 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
647 Dmsg1(50, "Result status failed: %s\n", query);
648 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
657 #ifdef HAVE_BATCH_FILE_INSERT
659 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
661 const char *query = "COPY batch FROM STDIN";
663 Dmsg0(500, "my_postgresql_batch_start started\n");
665 if (my_postgresql_query(mdb,
666 "CREATE TEMPORARY TABLE batch ("
672 "md5 varchar)") == 1)
674 Dmsg0(500, "my_postgresql_batch_start failed\n");
678 // We are starting a new query. reset everything.
680 mdb->row_number = -1;
681 mdb->field_number = -1;
683 my_postgresql_free_result(mdb);
685 for (int i=0; i < 10; i++) {
686 mdb->result = PQexec(mdb->db, query);
693 Dmsg1(50, "Query failed: %s\n", query);
697 mdb->status = PQresultStatus(mdb->result);
698 if (mdb->status == PGRES_COPY_IN) {
699 // how many fields in the set?
700 mdb->num_fields = (int) PQnfields(mdb->result);
704 Dmsg1(50, "Result status failed: %s\n", query);
708 Dmsg0(500, "my_postgresql_batch_start finishing\n");
713 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
715 PQclear(mdb->result);
720 /* set error to something to abort operation */
721 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
726 Dmsg0(500, "my_postgresql_batch_end started\n");
728 if (!mdb) { /* no files ? */
733 res = PQputCopyEnd(mdb->db, error);
734 } while (res == 0 && --count > 0);
742 Dmsg0(500, "we failed\n");
744 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
747 /* Check command status and return to normal libpq state */
748 result = PQgetResult(mdb->db);
749 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
750 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
755 Dmsg0(500, "my_postgresql_batch_end finishing\n");
760 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
768 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
769 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
771 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
772 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
774 if (ar->Digest == NULL || ar->Digest[0] == 0) {
780 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
781 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
782 mdb->esc_name, ar->attr, digest);
785 res = PQputCopyData(mdb->db,
788 } while (res == 0 && --count > 0);
797 Dmsg0(500, "we failed\n");
799 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
802 Dmsg0(500, "my_postgresql_batch_insert finishing\n");
807 #endif /* HAVE_BATCH_FILE_INSERT */
810 * Escape strings so that PostgreSQL is happy on COPY
812 * NOTE! len is the length of the old string. Your new
813 * string must be long enough (max 2*old+1) to hold
814 * the escaped output.
816 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
818 /* we have to escape \t, \n, \r, \ */
821 while (len > 0 && *src) {
856 #ifdef HAVE_BATCH_FILE_INSERT
857 const char *my_pg_batch_lock_path_query =
858 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
861 const char *my_pg_batch_lock_filename_query =
862 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
864 const char *my_pg_batch_unlock_tables_query = "COMMIT";
866 const char *my_pg_batch_fill_path_query =
867 "INSERT INTO Path (Path) "
868 "SELECT a.Path FROM "
869 "(SELECT DISTINCT Path FROM batch) AS a "
870 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
873 const char *my_pg_batch_fill_filename_query =
874 "INSERT INTO Filename (Name) "
875 "SELECT a.Name FROM "
876 "(SELECT DISTINCT Name FROM batch) as a "
878 "(SELECT Name FROM Filename WHERE Name = a.Name)";
879 #endif /* HAVE_BATCH_FILE_INSERT */
881 #endif /* HAVE_POSTGRESQL */