2 Bacula® - The Network Backup Solution
4 Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Bacula Catalog Database routines specific to PostgreSQL
30 * These are PostgreSQL specific routines
32 * Dan Langille, December 2003
33 * based upon work done by Kern Sibbald, March 2000
35 * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
40 #ifdef HAVE_POSTGRESQL
45 #include "postgres_ext.h" /* needed for NAMEDATALEN */
46 #include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */
47 #include "bdb_postgresql.h"
49 /* -----------------------------------------------------------------------
51 * PostgreSQL dependent defines and subroutines
53 * -----------------------------------------------------------------------
57 * List of open databases
59 static dlist *db_list = NULL;
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63 B_DB_POSTGRESQL::B_DB_POSTGRESQL(JCR *jcr,
64 const char *db_driver,
67 const char *db_password,
68 const char *db_address,
70 const char *db_socket,
71 bool mult_db_connections,
72 bool disable_batch_insert)
75 * Initialize the parent class members.
77 m_db_interface_type = SQL_INTERFACE_TYPE_POSTGRESQL;
78 m_db_type = SQL_TYPE_POSTGRESQL;
79 m_db_driver = bstrdup("PostgreSQL");
80 m_db_name = bstrdup(db_name);
81 m_db_user = bstrdup(db_user);
83 m_db_password = bstrdup(db_password);
86 m_db_address = bstrdup(db_address);
89 m_db_socket = bstrdup(db_socket);
92 if (disable_batch_insert) {
93 m_disabled_batch_insert = true;
94 m_have_batch_insert = false;
96 m_disabled_batch_insert = false;
97 #if defined(USE_BATCH_FILE_INSERT)
98 #if defined(HAVE_POSTGRESQL_BATCH_FILE_INSERT) || defined(HAVE_PQISTHREADSAFE)
99 #ifdef HAVE_PQISTHREADSAFE
100 m_have_batch_insert = PQisthreadsafe();
102 m_have_batch_insert = true;
103 #endif /* HAVE_PQISTHREADSAFE */
105 m_have_batch_insert = true;
106 #endif /* HAVE_POSTGRESQL_BATCH_FILE_INSERT || HAVE_PQISTHREADSAFE */
108 m_have_batch_insert = false;
109 #endif /* USE_BATCH_FILE_INSERT */
111 errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
113 cmd = get_pool_memory(PM_EMSG); /* get command buffer */
114 cached_path = get_pool_memory(PM_FNAME);
117 fname = get_pool_memory(PM_FNAME);
118 path = get_pool_memory(PM_FNAME);
119 esc_name = get_pool_memory(PM_FNAME);
120 esc_path = get_pool_memory(PM_FNAME);
121 m_allow_transactions = mult_db_connections;
124 * Initialize the private members.
130 * Put the db in the list.
132 if (db_list == NULL) {
133 db_list = New(dlist(this, &this->m_link));
135 db_list->append(this);
138 B_DB_POSTGRESQL::~B_DB_POSTGRESQL()
143 * Check that the database correspond to the encoding we want
145 static bool pgsql_check_database_encoding(JCR *jcr, B_DB_POSTGRESQL *mdb)
150 if (!mdb->sql_query("SELECT getdatabaseencoding()", QF_STORE_RESULT)) {
151 Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
155 if ((row = mdb->sql_fetch_row()) == NULL) {
156 Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), mdb->sql_strerror());
157 Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
159 ret = bstrcmp(row[0], "SQL_ASCII");
163 * If we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too
165 mdb->sql_query("SET client_encoding TO 'SQL_ASCII'");
169 * Something is wrong with database encoding
172 _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
173 mdb->get_db_name(), row[0]);
174 Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
175 Dmsg1(50, "%s", mdb->errmsg);
182 * Now actually open the database. This can generate errors,
183 * which are returned in the errmsg
185 * DO NOT close the database or delete mdb here !!!!
187 bool B_DB_POSTGRESQL::db_open_database(JCR *jcr)
199 if ((errstat=rwl_init(&m_lock)) != 0) {
201 Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
202 be.bstrerror(errstat));
207 bsnprintf(buf, sizeof(buf), "%d", m_db_port);
213 /* If connection fails, try at 5 sec intervals for 30 seconds. */
214 for (int retry=0; retry < 6; retry++) {
215 /* connect to the database */
216 m_db_handle = PQsetdbLogin(
217 m_db_address, /* default = localhost */
218 port, /* default port */
219 NULL, /* pg options */
220 NULL, /* tty, ignored */
221 m_db_name, /* database name */
222 m_db_user, /* login name */
223 m_db_password); /* password */
225 /* If no connect, try once more in case it is a timing problem */
226 if (PQstatus(m_db_handle) == CONNECTION_OK) {
232 Dmsg0(50, "pg_real_connect done\n");
233 Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
234 (m_db_password == NULL) ? "(NULL)" : m_db_password);
236 if (PQstatus(m_db_handle) != CONNECTION_OK) {
237 Mmsg2(&errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
238 "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
239 m_db_name, m_db_user);
244 if (!check_tables_version(jcr, this)) {
248 sql_query("SET datestyle TO 'ISO, YMD'");
251 * Tell PostgreSQL we are using standard conforming strings
252 * and avoid warnings such as:
253 * WARNING: nonstandard use of \\ in a string literal
255 sql_query("SET standard_conforming_strings=on");
258 * Check that encoding is SQL_ASCII
260 pgsql_check_database_encoding(jcr, this);
269 void B_DB_POSTGRESQL::db_close_database(JCR *jcr)
271 db_end_transaction(jcr);
275 if (m_ref_count == 0) {
276 db_list->remove(this);
277 if (m_connected && m_db_handle) {
278 PQfinish(m_db_handle);
280 rwl_destroy(&m_lock);
281 free_pool_memory(errmsg);
282 free_pool_memory(cmd);
283 free_pool_memory(cached_path);
284 free_pool_memory(fname);
285 free_pool_memory(path);
286 free_pool_memory(esc_name);
287 free_pool_memory(esc_path);
310 if (db_list->size() == 0) {
318 void B_DB_POSTGRESQL::db_thread_cleanup(void)
323 * Escape strings so that PostgreSQL is happy
325 * NOTE! len is the length of the old string. Your new
326 * string must be long enough (max 2*old+1) to hold
327 * the escaped output.
329 void B_DB_POSTGRESQL::db_escape_string(JCR *jcr, char *snew, char *old, int len)
333 PQescapeStringConn(m_db_handle, snew, old, len, &error);
335 Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
336 /* error on encoding, probably invalid multibyte encoding in the source string
337 see PQescapeStringConn documentation for details. */
338 Dmsg0(500, "PQescapeStringConn failed\n");
343 * Escape binary so that PostgreSQL is happy
346 char *B_DB_POSTGRESQL::db_escape_object(JCR *jcr, char *old, int len)
351 obj = PQescapeByteaConn(m_db_handle, (unsigned const char *)old, len, &new_len);
353 Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n"));
356 esc_obj = check_pool_memory_size(esc_obj, new_len+1);
357 memcpy(esc_obj, obj, new_len);
362 return (char *)esc_obj;
366 * Unescape binary object so that PostgreSQL is happy
369 void B_DB_POSTGRESQL::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
370 POOLMEM **dest, int32_t *dest_len)
381 obj = PQunescapeBytea((unsigned const char *)from, &new_len);
384 Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n"));
388 *dest = check_pool_memory_size(*dest, new_len+1);
389 memcpy(*dest, obj, new_len);
394 Dmsg1(010, "obj size: %d\n", *dest_len);
398 * Start a transaction. This groups inserts and makes things
399 * much more efficient. Usually started when inserting
402 void B_DB_POSTGRESQL::db_start_transaction(JCR *jcr)
405 jcr->attr = get_pool_memory(PM_FNAME);
408 jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
412 * This is turned off because transactions break
413 * if multiple simultaneous jobs are run.
415 if (!m_allow_transactions) {
421 * Allow only 25,000 changes per transaction
423 if (m_transaction && changes > 25000) {
424 db_end_transaction(jcr);
426 if (!m_transaction) {
427 sql_query("BEGIN"); /* begin transaction */
428 Dmsg0(400, "Start PosgreSQL transaction\n");
429 m_transaction = true;
434 void B_DB_POSTGRESQL::db_end_transaction(JCR *jcr)
436 if (jcr && jcr->cached_attribute) {
437 Dmsg0(400, "Flush last cached attribute.\n");
438 if (!db_create_attributes_record(jcr, this, jcr->ar)) {
439 Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
441 jcr->cached_attribute = false;
444 if (!m_allow_transactions) {
450 sql_query("COMMIT"); /* end transaction */
451 m_transaction = false;
452 Dmsg1(400, "End PostgreSQL transaction changes=%d\n", changes);
459 * Submit a general SQL command (cmd), and for each row returned,
460 * the result_handler is called with the ctx.
462 bool B_DB_POSTGRESQL::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
467 Dmsg1(500, "db_sql_query starts with '%s'\n", query);
470 if (!sql_query(query, QF_STORE_RESULT)) {
471 Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
472 Dmsg0(500, "db_sql_query failed\n");
477 Dmsg0(500, "db_sql_query succeeded. checking handler\n");
479 if (result_handler != NULL) {
480 Dmsg0(500, "db_sql_query invoking handler\n");
481 while ((row = sql_fetch_row()) != NULL) {
482 Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
483 if (result_handler(ctx, m_num_fields, row))
489 Dmsg0(500, "db_sql_query finished\n");
497 * Note, if this routine returns false (failure), Bacula expects
498 * that no result has been stored.
499 * This is where QUERY_DB comes with Postgresql.
501 * Returns: true on success
505 bool B_DB_POSTGRESQL::sql_query(const char *query, int flags)
510 Dmsg1(500, "sql_query starts with '%s'\n", query);
512 * We are starting a new query. reset everything.
519 PQclear(m_result); /* hmm, someone forgot to free?? */
523 for (i = 0; i < 10; i++) {
524 m_result = PQexec(m_db_handle, query);
531 Dmsg1(50, "Query failed: %s\n", query);
535 m_status = PQresultStatus(m_result);
536 if (m_status == PGRES_TUPLES_OK || m_status == PGRES_COMMAND_OK) {
537 Dmsg0(500, "we have a result\n");
540 * How many fields in the set?
542 m_num_fields = (int)PQnfields(m_result);
543 Dmsg1(500, "we have %d fields\n", m_num_fields);
545 m_num_rows = PQntuples(m_result);
546 Dmsg1(500, "we have %d rows\n", m_num_rows);
548 m_row_number = 0; /* we can start to fetch something */
549 m_status = 0; /* succeed */
552 Dmsg1(50, "Result status failed: %s\n", query);
556 Dmsg0(500, "sql_query finishing\n");
560 Dmsg0(500, "we failed\n");
563 m_status = 1; /* failed */
569 void B_DB_POSTGRESQL::sql_free_result(void)
584 m_num_rows = m_num_fields = 0;
588 SQL_ROW B_DB_POSTGRESQL::sql_fetch_row(void)
591 SQL_ROW row = NULL; /* by default, return NULL */
593 Dmsg0(500, "sql_fetch_row start\n");
595 if (!m_rows || m_rows_size < m_num_fields) {
597 Dmsg0(500, "sql_fetch_row freeing space\n");
600 Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
601 m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
602 m_rows_size = m_num_fields;
605 * Now reset the row_number now that we have the space allocated
611 * If still within the result set
613 if (m_row_number >= 0 && m_row_number < m_num_rows) {
614 Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
616 * Get each value from this row
618 for (j = 0; j < m_num_fields; j++) {
619 m_rows[j] = PQgetvalue(m_result, m_row_number, j);
620 Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
623 * Increment the row number for the next call
628 Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
631 Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
636 const char *B_DB_POSTGRESQL::sql_strerror(void)
638 return PQerrorMessage(m_db_handle);
641 void B_DB_POSTGRESQL::sql_data_seek(int row)
644 * Set the row number to be returned on the next call to sql_fetch_row
649 int B_DB_POSTGRESQL::sql_affected_rows(void)
651 return (unsigned) str_to_int32(PQcmdTuples(m_result));
654 uint64_t B_DB_POSTGRESQL::sql_insert_autokey_record(const char *query, const char *table_name)
658 char sequence[NAMEDATALEN-1];
659 char getkeyval_query[NAMEDATALEN+50];
663 * First execute the insert query and then retrieve the currval.
665 if (!sql_query(query)) {
669 m_num_rows = sql_affected_rows();
670 if (m_num_rows != 1) {
677 * Obtain the current value of the sequence that
678 * provides the serial value for primary key of the table.
680 * currval is local to our session. It is not affected by
681 * other transactions.
683 * Determine the name of the sequence.
684 * PostgreSQL automatically creates a sequence using
685 * <table>_<column>_seq.
686 * At the time of writing, all tables used this format for
687 * for their primary key: <table>id
688 * Except for basefiles which has a primary key on baseid.
689 * Therefore, we need to special case that one table.
691 * everything else can use the PostgreSQL formula.
693 if (strcasecmp(table_name, "basefiles") == 0) {
694 bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
696 bstrncpy(sequence, table_name, sizeof(sequence));
697 bstrncat(sequence, "_", sizeof(sequence));
698 bstrncat(sequence, table_name, sizeof(sequence));
699 bstrncat(sequence, "id", sizeof(sequence));
702 bstrncat(sequence, "_seq", sizeof(sequence));
703 bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT currval('%s')", sequence);
705 Dmsg1(500, "sql_insert_autokey_record executing query '%s'\n", getkeyval_query);
706 for (i = 0; i < 10; i++) {
707 pg_result = PQexec(m_db_handle, getkeyval_query);
714 Dmsg1(50, "Query failed: %s\n", getkeyval_query);
718 Dmsg0(500, "exec done");
720 if (PQresultStatus(pg_result) == PGRES_TUPLES_OK) {
721 Dmsg0(500, "getting value");
722 id = str_to_uint64(PQgetvalue(pg_result, 0, 0));
723 Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(pg_result, 0, 0), id);
725 Dmsg1(50, "Result status failed: %s\n", getkeyval_query);
726 Mmsg1(&errmsg, _("error fetching currval: %s\n"), PQerrorMessage(m_db_handle));
735 SQL_FIELD *B_DB_POSTGRESQL::sql_fetch_field(void)
741 Dmsg0(500, "sql_fetch_field starts\n");
743 if (!m_fields || m_fields_size < m_num_fields) {
748 Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
749 m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
750 m_fields_size = m_num_fields;
752 for (i = 0; i < m_num_fields; i++) {
753 Dmsg1(500, "filling field %d\n", i);
754 m_fields[i].name = PQfname(m_result, i);
755 m_fields[i].type = PQftype(m_result, i);
756 m_fields[i].flags = 0;
759 * For a given column, find the max length.
762 for (j = 0; j < m_num_rows; j++) {
763 if (PQgetisnull(m_result, j, i)) {
764 this_length = 4; /* "NULL" */
766 this_length = cstrlen(PQgetvalue(m_result, j, i));
769 if (max_length < this_length) {
770 max_length = this_length;
773 m_fields[i].max_length = max_length;
775 Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
776 m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
781 * Increment field number for the next time around
783 return &m_fields[m_field_number++];
786 bool B_DB_POSTGRESQL::sql_field_is_not_null(int field_type)
788 switch (field_type) {
796 bool B_DB_POSTGRESQL::sql_field_is_numeric(int field_type)
799 * TEMP: the following is taken from select OID, typname from pg_type;
801 switch (field_type) {
814 * Escape strings so that PostgreSQL is happy on COPY
816 * NOTE! len is the length of the old string. Your new
817 * string must be long enough (max 2*old+1) to hold
818 * the escaped output.
820 static char *pgsql_copy_escape(char *dest, char *src, size_t len)
822 /* we have to escape \t, \n, \r, \ */
825 while (len > 0 && *src) {
860 bool B_DB_POSTGRESQL::sql_batch_start(JCR *jcr)
862 const char *query = "COPY batch FROM STDIN";
864 Dmsg0(500, "sql_batch_start started\n");
866 if (!sql_query("CREATE TEMPORARY TABLE batch ("
874 Dmsg0(500, "sql_batch_start failed\n");
879 * We are starting a new query. reset everything.
887 for (int i=0; i < 10; i++) {
888 m_result = PQexec(m_db_handle, query);
895 Dmsg1(50, "Query failed: %s\n", query);
899 m_status = PQresultStatus(m_result);
900 if (m_status == PGRES_COPY_IN) {
902 * How many fields in the set?
904 m_num_fields = (int) PQnfields(m_result);
908 Dmsg1(50, "Result status failed: %s\n", query);
912 Dmsg0(500, "sql_batch_start finishing\n");
917 Mmsg1(&errmsg, _("error starting batch mode: %s"), PQerrorMessage(m_db_handle));
925 * Set error to something to abort operation
927 bool B_DB_POSTGRESQL::sql_batch_end(JCR *jcr, const char *error)
933 Dmsg0(500, "sql_batch_end started\n");
936 res = PQputCopyEnd(m_db_handle, error);
937 } while (res == 0 && --count > 0);
945 Dmsg0(500, "we failed\n");
947 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
948 Dmsg1(500, "failure %s\n", errmsg);
951 /* Check command status and return to normal libpq state */
952 pg_result = PQgetResult(m_db_handle);
953 if (PQresultStatus(pg_result) != PGRES_COMMAND_OK) {
954 Mmsg1(&errmsg, _("error ending batch mode: %s"), PQerrorMessage(m_db_handle));
959 Dmsg0(500, "sql_batch_end finishing\n");
964 bool B_DB_POSTGRESQL::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
972 esc_name = check_pool_memory_size(esc_name, fnl*2+1);
973 pgsql_copy_escape(esc_name, fname, fnl);
975 esc_path = check_pool_memory_size(esc_path, pnl*2+1);
976 pgsql_copy_escape(esc_path, path, pnl);
978 if (ar->Digest == NULL || ar->Digest[0] == 0) {
984 len = Mmsg(cmd, "%u\t%s\t%s\t%s\t%s\t%s\t%u\n",
985 ar->FileIndex, edit_int64(ar->JobId, ed1), esc_path,
986 esc_name, ar->attr, digest, ar->DeltaSeq);
989 res = PQputCopyData(m_db_handle, cmd, len);
990 } while (res == 0 && --count > 0);
999 Dmsg0(500, "we failed\n");
1001 Mmsg1(&errmsg, _("error copying in batch mode: %s"), PQerrorMessage(m_db_handle));
1002 Dmsg1(500, "failure %s\n", errmsg);
1005 Dmsg0(500, "sql_batch_insert finishing\n");
1011 * Initialize database data structure. In principal this should
1012 * never have errors, or it is really fatal.
1014 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name,
1015 const char *db_user, const char *db_password,
1016 const char *db_address, int db_port,
1017 const char *db_socket, bool mult_db_connections,
1018 bool disable_batch_insert)
1020 B_DB_POSTGRESQL *mdb = NULL;
1023 Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
1026 P(mutex); /* lock DB queue */
1027 if (db_list && !mult_db_connections) {
1029 * Look to see if DB already open
1031 foreach_dlist(mdb, db_list) {
1032 if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1033 Dmsg1(100, "DB REopen %s\n", db_name);
1034 mdb->increment_refcount();
1039 Dmsg0(100, "db_init_database first time\n");
1040 mdb = New(B_DB_POSTGRESQL(jcr, db_driver, db_name, db_user, db_password,
1041 db_address, db_port, db_socket,
1042 mult_db_connections, disable_batch_insert));
1049 #endif /* HAVE_POSTGRESQL */