2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
38 /* The following is necessary so that we do not include
39 * the dummy external definition of DB.
41 #define __SQL_C /* indicate that this is sql.c */
46 #ifdef HAVE_POSTGRESQL
48 #include "postgres_ext.h" /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
51 /* -----------------------------------------------------------------------
53 * PostgreSQL dependent defines and subroutines
55 * -----------------------------------------------------------------------
58 /* List of open databases */
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
64 * Retrieve database type
74 * Initialize database data structure. In principal this should
75 * never have errors, or it is really fatal.
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79 const char *db_address, int db_port, const char *db_socket,
80 int mult_db_connections)
85 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
88 P(mutex); /* lock DB queue */
89 if (db_list == NULL) {
90 db_list = New(dlist(mdb, &mdb->link));
92 if (!mult_db_connections) {
93 /* Look to see if DB already open */
94 foreach_dlist(mdb, db_list) {
95 if (bstrcmp(mdb->db_name, db_name) &&
96 bstrcmp(mdb->db_address, db_address) &&
97 mdb->db_port == db_port) {
98 Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
101 return mdb; /* already open */
105 Dmsg0(100, "db_open first time\n");
106 mdb = (B_DB *)malloc(sizeof(B_DB));
107 memset(mdb, 0, sizeof(B_DB));
108 mdb->db_name = bstrdup(db_name);
109 mdb->db_user = bstrdup(db_user);
111 mdb->db_password = bstrdup(db_password);
114 mdb->db_address = bstrdup(db_address);
117 mdb->db_socket = bstrdup(db_socket);
119 mdb->db_port = db_port;
120 mdb->have_insert_id = TRUE;
121 mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
123 mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */
124 mdb->cached_path = get_pool_memory(PM_FNAME);
125 mdb->cached_path_id = 0;
127 mdb->fname = get_pool_memory(PM_FNAME);
128 mdb->path = get_pool_memory(PM_FNAME);
129 mdb->esc_name = get_pool_memory(PM_FNAME);
130 mdb->esc_path = get_pool_memory(PM_FNAME);
131 mdb->allow_transactions = mult_db_connections;
132 db_list->append(mdb); /* put db in list */
137 /* Check that the database correspond to the encoding we want */
138 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
143 if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
144 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
148 if ((row = sql_fetch_row(mdb)) == NULL) {
149 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
150 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
152 ret = bstrcmp(row[0], "SQL_ASCII");
155 /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
156 db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
158 } else { /* something is wrong with database encoding */
160 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
161 mdb->db_name, row[0]);
162 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
163 Dmsg1(50, "%s", mdb->errmsg);
170 * Now actually open the database. This can generate errors,
171 * which are returned in the errmsg
173 * DO NOT close the database or free(mdb) here !!!!
176 db_open_database(JCR *jcr, B_DB *mdb)
182 if (mdb->connected) {
186 mdb->connected = false;
188 if ((errstat=rwl_init(&mdb->lock)) != 0) {
190 Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
191 be.bstrerror(errstat));
197 bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
203 /* If connection fails, try at 5 sec intervals for 30 seconds. */
204 for (int retry=0; retry < 6; retry++) {
205 /* connect to the database */
206 mdb->db = PQsetdbLogin(
207 mdb->db_address, /* default = localhost */
208 port, /* default port */
209 NULL, /* pg options */
210 NULL, /* tty, ignored */
211 mdb->db_name, /* database name */
212 mdb->db_user, /* login name */
213 mdb->db_password); /* password */
215 /* If no connect, try once more in case it is a timing problem */
216 if (PQstatus(mdb->db) == CONNECTION_OK) {
222 Dmsg0(50, "pg_real_connect done\n");
223 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
224 mdb->db_password==NULL?"(NULL)":mdb->db_password);
226 if (PQstatus(mdb->db) != CONNECTION_OK) {
227 Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
228 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
229 mdb->db_name, mdb->db_user);
234 mdb->connected = true;
236 if (!check_tables_version(jcr, mdb)) {
241 sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
243 /* tell PostgreSQL we are using standard conforming strings
244 and avoid warnings such as:
245 WARNING: nonstandard use of \\ in a string literal
247 sql_query(mdb, "set standard_conforming_strings=on");
249 /* check that encoding is SQL_ASCII */
250 check_database_encoding(jcr, mdb);
257 db_close_database(JCR *jcr, B_DB *mdb)
262 db_end_transaction(jcr, mdb);
264 sql_free_result(mdb);
266 if (mdb->ref_count == 0) {
267 db_list->remove(mdb);
268 if (mdb->connected && mdb->db) {
271 rwl_destroy(&mdb->lock);
272 free_pool_memory(mdb->errmsg);
273 free_pool_memory(mdb->cmd);
274 free_pool_memory(mdb->cached_path);
275 free_pool_memory(mdb->fname);
276 free_pool_memory(mdb->path);
277 free_pool_memory(mdb->esc_name);
278 free_pool_memory(mdb->esc_path);
285 if (mdb->db_password) {
286 free(mdb->db_password);
288 if (mdb->db_address) {
289 free(mdb->db_address);
291 if (mdb->db_socket) {
292 free(mdb->db_socket);
295 if (db_list->size() == 0) {
303 void db_check_backend_thread_safe()
305 #ifdef HAVE_BATCH_FILE_INSERT
306 # ifdef HAVE_PQISTHREADSAFE
307 if (!PQisthreadsafe()) {
308 Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
309 "when using BatchMode.\n"));
315 void db_thread_cleanup()
319 * Return the next unique index (auto-increment) for
320 * the given table. Return NULL on error.
322 * For PostgreSQL, NULL causes the auto-increment value
325 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
327 strcpy(index, "NULL");
333 * Escape strings so that PostgreSQL is happy
335 * NOTE! len is the length of the old string. Your new
336 * string must be long enough (max 2*old+1) to hold
337 * the escaped output.
340 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
344 PQescapeStringConn(mdb->db, snew, old, len, &error);
346 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
347 /* error on encoding, probably invalid multibyte encoding in the source string
348 see PQescapeStringConn documentation for details. */
349 Dmsg0(500, "PQescapeStringConn failed\n");
354 * Submit a general SQL command (cmd), and for each row returned,
355 * the sqlite_handler is called with the ctx.
357 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
361 Dmsg0(500, "db_sql_query started\n");
364 if (sql_query(mdb, query) != 0) {
365 Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
367 Dmsg0(500, "db_sql_query failed\n");
370 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
372 if (result_handler != NULL) {
373 Dmsg0(500, "db_sql_query invoking handler\n");
374 if ((mdb->result = sql_store_result(mdb)) != NULL) {
375 int num_fields = sql_num_fields(mdb);
377 Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
378 while ((row = sql_fetch_row(mdb)) != NULL) {
380 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
381 if (result_handler(ctx, num_fields, row))
385 sql_free_result(mdb);
390 Dmsg0(500, "db_sql_query finished\n");
397 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
400 POSTGRESQL_ROW row = NULL; // by default, return NULL
402 Dmsg0(500, "my_postgresql_fetch_row start\n");
404 if (!mdb->row || mdb->row_size < mdb->num_fields) {
405 int num_fields = mdb->num_fields;
406 Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
409 Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
412 num_fields += 20; /* add a bit extra */
413 mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
414 mdb->row_size = num_fields;
416 // now reset the row_number now that we have the space allocated
420 // if still within the result set
421 if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
422 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
423 // get each value from this row
424 for (j = 0; j < mdb->num_fields; j++) {
425 mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
426 Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
428 // increment the row number for the next call
433 Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
436 Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
441 int my_postgresql_max_length(B_DB *mdb, int field_num) {
443 // for a given column, find the max length
450 for (i = 0; i < mdb->num_rows; i++) {
451 if (PQgetisnull(mdb->result, i, field_num)) {
452 this_length = 4; // "NULL"
454 this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
457 if (max_length < this_length) {
458 max_length = this_length;
465 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
469 Dmsg0(500, "my_postgresql_fetch_field starts\n");
471 if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
475 Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
476 mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
477 mdb->fields_size = mdb->num_fields;
479 for (i = 0; i < mdb->num_fields; i++) {
480 Dmsg1(500, "filling field %d\n", i);
481 mdb->fields[i].name = PQfname(mdb->result, i);
482 mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
483 mdb->fields[i].type = PQftype(mdb->result, i);
484 mdb->fields[i].flags = 0;
486 Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
487 mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
488 mdb->fields[i].flags);
492 // increment field number for the next time around
494 Dmsg0(500, "my_postgresql_fetch_field finishes\n");
495 return &mdb->fields[mdb->field_number++];
498 void my_postgresql_data_seek(B_DB *mdb, int row)
500 // set the row number to be returned on the next call
501 // to my_postgresql_fetch_row
502 mdb->row_number = row;
505 void my_postgresql_field_seek(B_DB *mdb, int field)
507 mdb->field_number = field;
511 * Note, if this routine returns 1 (failure), Bacula expects
512 * that no result has been stored.
513 * This is where QUERY_DB comes with Postgresql.
515 * Returns: 0 on success
519 int my_postgresql_query(B_DB *mdb, const char *query)
521 Dmsg0(500, "my_postgresql_query started\n");
522 // We are starting a new query. reset everything.
524 mdb->row_number = -1;
525 mdb->field_number = -1;
528 PQclear(mdb->result); /* hmm, someone forgot to free?? */
532 Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
534 for (int i=0; i < 10; i++) {
535 mdb->result = PQexec(mdb->db, query);
542 Dmsg1(50, "Query failed: %s\n", query);
546 mdb->status = PQresultStatus(mdb->result);
547 if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
548 Dmsg1(500, "we have a result\n", query);
550 // how many fields in the set?
551 mdb->num_fields = (int)PQnfields(mdb->result);
552 Dmsg1(500, "we have %d fields\n", mdb->num_fields);
554 mdb->num_rows = PQntuples(mdb->result);
555 Dmsg1(500, "we have %d rows\n", mdb->num_rows);
557 mdb->row_number = 0; /* we can start to fetch something */
558 mdb->status = 0; /* succeed */
560 Dmsg1(50, "Result status failed: %s\n", query);
564 Dmsg0(500, "my_postgresql_query finishing\n");
568 Dmsg1(500, "we failed\n", query);
569 PQclear(mdb->result);
571 mdb->status = 1; /* failed */
575 void my_postgresql_free_result(B_DB *mdb)
580 PQclear(mdb->result);
596 int my_postgresql_currval(B_DB *mdb, const char *table_name)
598 // Obtain the current value of the sequence that
599 // provides the serial value for primary key of the table.
601 // currval is local to our session. It is not affected by
602 // other transactions.
604 // Determine the name of the sequence.
605 // PostgreSQL automatically creates a sequence using
606 // <table>_<column>_seq.
607 // At the time of writing, all tables used this format for
608 // for their primary key: <table>id
609 // Except for basefiles which has a primary key on baseid.
610 // Therefore, we need to special case that one table.
612 // everything else can use the PostgreSQL formula.
614 char sequence[NAMEDATALEN-1];
615 char query [NAMEDATALEN+50];
619 if (strcasecmp(table_name, "basefiles") == 0) {
620 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
622 bstrncpy(sequence, table_name, sizeof(sequence));
623 bstrncat(sequence, "_", sizeof(sequence));
624 bstrncat(sequence, table_name, sizeof(sequence));
625 bstrncat(sequence, "id", sizeof(sequence));
628 bstrncat(sequence, "_seq", sizeof(sequence));
629 bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
631 Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
632 for (int i=0; i < 10; i++) {
633 result = PQexec(mdb->db, query);
640 Dmsg1(50, "Query failed: %s\n", query);
644 Dmsg0(500, "exec done");
646 if (PQresultStatus(result) == PGRES_TUPLES_OK) {
647 Dmsg0(500, "getting value");
648 id = atoi(PQgetvalue(result, 0, 0));
649 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
651 Dmsg1(50, "Result status failed: %s\n", query);
652 Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
661 #ifdef HAVE_BATCH_FILE_INSERT
663 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
665 const char *query = "COPY batch FROM STDIN";
667 Dmsg0(500, "my_postgresql_batch_start started\n");
669 if (my_postgresql_query(mdb,
670 "CREATE TEMPORARY TABLE batch ("
676 "md5 varchar)") == 1)
678 Dmsg0(500, "my_postgresql_batch_start failed\n");
682 // We are starting a new query. reset everything.
684 mdb->row_number = -1;
685 mdb->field_number = -1;
687 my_postgresql_free_result(mdb);
689 for (int i=0; i < 10; i++) {
690 mdb->result = PQexec(mdb->db, query);
697 Dmsg1(50, "Query failed: %s\n", query);
701 mdb->status = PQresultStatus(mdb->result);
702 if (mdb->status == PGRES_COPY_IN) {
703 // how many fields in the set?
704 mdb->num_fields = (int) PQnfields(mdb->result);
708 Dmsg1(50, "Result status failed: %s\n", query);
712 Dmsg0(500, "my_postgresql_batch_start finishing\n");
717 Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
719 PQclear(mdb->result);
724 /* set error to something to abort operation */
725 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
730 Dmsg0(500, "my_postgresql_batch_end started\n");
732 if (!mdb) { /* no files ? */
737 res = PQputCopyEnd(mdb->db, error);
738 } while (res == 0 && --count > 0);
746 Dmsg0(500, "we failed\n");
748 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
751 /* Check command status and return to normal libpq state */
752 result = PQgetResult(mdb->db);
753 if (PQresultStatus(result) != PGRES_COMMAND_OK) {
754 Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
759 Dmsg0(500, "my_postgresql_batch_end finishing\n");
764 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
772 mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
773 my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
775 mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
776 my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
778 if (ar->Digest == NULL || ar->Digest[0] == 0) {
784 len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
785 ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
786 mdb->esc_name, ar->attr, digest);
789 res = PQputCopyData(mdb->db,
792 } while (res == 0 && --count > 0);
801 Dmsg0(500, "we failed\n");
803 Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
806 Dmsg0(500, "my_postgresql_batch_insert finishing\n");
811 #endif /* HAVE_BATCH_FILE_INSERT */
814 * Escape strings so that PostgreSQL is happy on COPY
816 * NOTE! len is the length of the old string. Your new
817 * string must be long enough (max 2*old+1) to hold
818 * the escaped output.
820 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
822 /* we have to escape \t, \n, \r, \ */
825 while (len > 0 && *src) {
860 #ifdef HAVE_BATCH_FILE_INSERT
861 const char *my_pg_batch_lock_path_query =
862 "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
865 const char *my_pg_batch_lock_filename_query =
866 "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
868 const char *my_pg_batch_unlock_tables_query = "COMMIT";
870 const char *my_pg_batch_fill_path_query =
871 "INSERT INTO Path (Path) "
872 "SELECT a.Path FROM "
873 "(SELECT DISTINCT Path FROM batch) AS a "
874 "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
877 const char *my_pg_batch_fill_filename_query =
878 "INSERT INTO Filename (Name) "
879 "SELECT a.Name FROM "
880 "(SELECT DISTINCT Name FROM batch) as a "
882 "(SELECT Name FROM Filename WHERE Name = a.Name)";
883 #endif /* HAVE_BATCH_FILE_INSERT */
885 #endif /* HAVE_POSTGRESQL */