X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fcats%2Fpostgresql.c;h=9407978d67ce1e751857298f0b5d5601511fd4ff;hb=f05712c0b5add5fb7953219ee20e734455cd8c16;hp=af7b8d312ea71f7440dabccee5f73c5dc3d4385b;hpb=93de5995d581240534405b7e914d5c72045d650a;p=bacula%2Fbacula diff --git a/bacula/src/cats/postgresql.c b/bacula/src/cats/postgresql.c index af7b8d312e..9407978d67 100644 --- a/bacula/src/cats/postgresql.c +++ b/bacula/src/cats/postgresql.c @@ -1,38 +1,44 @@ /* - * Bacula Catalog Database routines specific to PostgreSQL - * These are PostgreSQL specific routines - * - * Dan Langille, December 2003 - * based upon work done by Kern Sibbald, March 2000 - * - * Version $Id$ - */ + Bacula® - The Network Backup Solution -/* - Copyright (C) 2000-2004 Kern Sibbald and John Walker + Copyright (C) 2003-2010 Free Software Foundation Europe e.V. - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License as - published by the Free Software Foundation; either version 2 of - the License, or (at your option) any later version. + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version two of the GNU General Public + License as published by the Free Software Foundation and included + in the file LICENSE. - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. - You should have received a copy of the GNU General Public - License along with this program; if not, write to the Free - Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, - MA 02111-1307, USA. + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + Bacula® is a registered trademark of Kern Sibbald. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ +/* + * Bacula Catalog Database routines specific to PostgreSQL + * These are PostgreSQL specific routines + * + * Dan Langille, December 2003 + * based upon work done by Kern Sibbald, March 2000 + * */ /* The following is necessary so that we do not include * the dummy external definition of DB. */ -#define __SQL_C /* indicate that this is sql.c */ +#define __SQL_C /* indicate that this is sql.c */ #include "bacula.h" #include "cats.h" @@ -40,6 +46,7 @@ #ifdef HAVE_POSTGRESQL #include "postgres_ext.h" /* needed for NAMEDATALEN */ +#include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */ /* ----------------------------------------------------------------------- * @@ -49,36 +56,54 @@ */ /* List of open databases */ -static BQUEUE db_list = {&db_list, &db_list}; +static dlist *db_list = NULL; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Retrieve database type + */ +const char * +db_get_type(void) +{ + return "PostgreSQL"; + +} + /* * Initialize database data structure. In principal this should * never have errors, or it is really fatal. */ B_DB * -db_init_database(JCR *jcr, char *db_name, char *db_user, char *db_password, - char *db_address, int db_port, char *db_socket) +db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password, + const char *db_address, int db_port, const char *db_socket, + int mult_db_connections) { - B_DB *mdb; + B_DB *mdb = NULL; - if (!db_user) { + if (!db_user) { Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n")); return NULL; } - P(mutex); /* lock DB queue */ - /* Look to see if DB already open */ - for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) { - if (strcmp(mdb->db_name, db_name) == 0) { - Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name); - mdb->ref_count++; - V(mutex); - return mdb; /* already open */ + P(mutex); /* lock DB queue */ + if (db_list == NULL) { + db_list = New(dlist(mdb, &mdb->link)); + } + if (!mult_db_connections) { + /* Look to see if DB already open */ + foreach_dlist(mdb, db_list) { + if (bstrcmp(mdb->db_name, db_name) && + bstrcmp(mdb->db_address, db_address) && + mdb->db_port == db_port) { + Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name); + mdb->ref_count++; + V(mutex); + return mdb; /* already open */ + } } } Dmsg0(100, "db_open first time\n"); - mdb = (B_DB *) malloc(sizeof(B_DB)); + mdb = (B_DB *)malloc(sizeof(B_DB)); memset(mdb, 0, sizeof(B_DB)); mdb->db_name = bstrdup(db_name); mdb->db_user = bstrdup(db_user); @@ -93,20 +118,54 @@ db_init_database(JCR *jcr, char *db_name, char *db_user, char *db_password, } mdb->db_port = db_port; mdb->have_insert_id = TRUE; - mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */ + mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */ *mdb->errmsg = 0; - mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */ + mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */ mdb->cached_path = get_pool_memory(PM_FNAME); mdb->cached_path_id = 0; mdb->ref_count = 1; - mdb->fname = get_pool_memory(PM_FNAME); - mdb->path = get_pool_memory(PM_FNAME); + mdb->fname = get_pool_memory(PM_FNAME); + mdb->path = get_pool_memory(PM_FNAME); mdb->esc_name = get_pool_memory(PM_FNAME); - qinsert(&db_list, &mdb->bq); /* put db in list */ + mdb->esc_path = get_pool_memory(PM_FNAME); + mdb->allow_transactions = mult_db_connections; + db_list->append(mdb); /* put db in list */ V(mutex); return mdb; } +/* Check that the database correspond to the encoding we want */ +static bool check_database_encoding(JCR *jcr, B_DB *mdb) +{ + SQL_ROW row; + int ret=false; + + if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) { + Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg); + return false; + } + + if ((row = sql_fetch_row(mdb)) == NULL) { + Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb)); + Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg); + } else { + ret = bstrcmp(row[0], "SQL_ASCII"); + + if (ret) { + /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */ + db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL); + + } else { /* something is wrong with database encoding */ + Mmsg(mdb->errmsg, + _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"), + mdb->db_name, row[0]); + Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg); + Dmsg1(50, "%s", mdb->errmsg); + } + } + return ret; +} + /* * Now actually open the database. This can generate errors, * which are returned in the errmsg @@ -124,11 +183,12 @@ db_open_database(JCR *jcr, B_DB *mdb) V(mutex); return 1; } - mdb->connected = FALSE; + mdb->connected = false; if ((errstat=rwl_init(&mdb->lock)) != 0) { - Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"), - strerror(errstat)); + berrno be; + Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"), + be.bstrerror(errstat)); V(mutex); return 0; } @@ -139,47 +199,56 @@ db_open_database(JCR *jcr, B_DB *mdb) } else { port = NULL; } - /* connect to the database */ - mdb->db = PQsetdbLogin( - mdb->db_address, /* default = localhost */ - port, /* default port */ - NULL, /* pg options */ - NULL, /* tty, ignored */ - mdb->db_name, /* database name */ - mdb->db_user, /* login name */ - mdb->db_password); /* password */ - - /* If no connect, try once more in case it is a timing problem */ - if (PQstatus(mdb->db) != CONNECTION_OK) { - mdb->db = PQsetdbLogin( - mdb->db_address, /* default = localhost */ - port, /* default port */ - NULL, /* pg options */ - NULL, /* tty, ignored */ - mdb->db_name, /* database name */ - mdb->db_user, /* login name */ - mdb->db_password); /* password */ - } - + + /* If connection fails, try at 5 sec intervals for 30 seconds. */ + for (int retry=0; retry < 6; retry++) { + /* connect to the database */ + mdb->db = PQsetdbLogin( + mdb->db_address, /* default = localhost */ + port, /* default port */ + NULL, /* pg options */ + NULL, /* tty, ignored */ + mdb->db_name, /* database name */ + mdb->db_user, /* login name */ + mdb->db_password); /* password */ + + /* If no connect, try once more in case it is a timing problem */ + if (PQstatus(mdb->db) == CONNECTION_OK) { + break; + } + bmicrosleep(5, 0); + } + Dmsg0(50, "pg_real_connect done\n"); - Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name, + Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name, mdb->db_password==NULL?"(NULL)":mdb->db_password); - + if (PQstatus(mdb->db) != CONNECTION_OK) { - Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n" - "Database=%s User=%s\n" - "It is probably not running or your password is incorrect.\n"), - mdb->db_name, mdb->db_user); + Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n" + "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"), + mdb->db_name, mdb->db_user); V(mutex); return 0; } + mdb->connected = true; + if (!check_tables_version(jcr, mdb)) { V(mutex); return 0; } - mdb->connected = TRUE; + sql_query(mdb, "SET datestyle TO 'ISO, YMD'"); + + /* tell PostgreSQL we are using standard conforming strings + and avoid warnings such as: + WARNING: nonstandard use of \\ in a string literal + */ + sql_query(mdb, "set standard_conforming_strings=on"); + + /* check that encoding is SQL_ASCII */ + check_database_encoding(jcr, mdb); + V(mutex); return 1; } @@ -190,44 +259,66 @@ db_close_database(JCR *jcr, B_DB *mdb) if (!mdb) { return; } + db_end_transaction(jcr, mdb); P(mutex); + sql_free_result(mdb); mdb->ref_count--; if (mdb->ref_count == 0) { - qdchain(&mdb->bq); + db_list->remove(mdb); if (mdb->connected && mdb->db) { - sql_close(mdb); + sql_close(mdb); } - rwl_destroy(&mdb->lock); + rwl_destroy(&mdb->lock); free_pool_memory(mdb->errmsg); free_pool_memory(mdb->cmd); free_pool_memory(mdb->cached_path); free_pool_memory(mdb->fname); free_pool_memory(mdb->path); free_pool_memory(mdb->esc_name); + free_pool_memory(mdb->esc_path); if (mdb->db_name) { - free(mdb->db_name); + free(mdb->db_name); } if (mdb->db_user) { - free(mdb->db_user); + free(mdb->db_user); } if (mdb->db_password) { - free(mdb->db_password); + free(mdb->db_password); } if (mdb->db_address) { - free(mdb->db_address); + free(mdb->db_address); } if (mdb->db_socket) { - free(mdb->db_socket); + free(mdb->db_socket); } free(mdb); + if (db_list->size() == 0) { + delete db_list; + db_list = NULL; + } } V(mutex); } +void db_check_backend_thread_safe() +{ +#ifdef HAVE_BATCH_FILE_INSERT +# ifdef HAVE_PQISTHREADSAFE + if (!PQisthreadsafe()) { + Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe " + "when using BatchMode.\n")); + } +# endif +#endif +} + +void db_thread_cleanup() +{ } + /* * Return the next unique index (auto-increment) for * the given table. Return NULL on error. - * + * * For PostgreSQL, NULL causes the auto-increment value * to be updated. */ @@ -235,105 +326,114 @@ int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index) { strcpy(index, "NULL"); return 1; -} +} /* * Escape strings so that PostgreSQL is happy * * NOTE! len is the length of the old string. Your new - * string must be long enough (max 2*old+1) to hold - * the escaped output. + * string must be long enough (max 2*old+1) to hold + * the escaped output. */ void -db_escape_string(char *snew, char *old, int len) +db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len) { - PQescapeString(snew, old, len); + int error; + + PQescapeStringConn(mdb->db, snew, old, len, &error); + if (error) { + Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); + /* error on encoding, probably invalid multibyte encoding in the source string + see PQescapeStringConn documentation for details. */ + Dmsg0(500, "PQescapeStringConn failed\n"); + } } /* * Submit a general SQL command (cmd), and for each row returned, * the sqlite_handler is called with the ctx. */ -int db_sql_query(B_DB *mdb, char *query, DB_RESULT_HANDLER *result_handler, void *ctx) +bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx) { SQL_ROW row; - Dmsg0(50, "db_sql_query started\n"); - + Dmsg0(500, "db_sql_query started\n"); + db_lock(mdb); if (sql_query(mdb, query) != 0) { - Mmsg(&mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb)); + Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb)); db_unlock(mdb); - Dmsg0(50, "db_sql_query failed\n"); - return 0; + Dmsg0(500, "db_sql_query failed\n"); + return false; } - Dmsg0(50, "db_sql_query succeeded. checking handler\n"); + Dmsg0(500, "db_sql_query succeeded. checking handler\n"); if (result_handler != NULL) { - Dmsg0(50, "db_sql_query invoking handler\n"); + Dmsg0(500, "db_sql_query invoking handler\n"); if ((mdb->result = sql_store_result(mdb)) != NULL) { - int num_fields = sql_num_fields(mdb); + int num_fields = sql_num_fields(mdb); - Dmsg0(50, "db_sql_query sql_store_result suceeded\n"); - while ((row = sql_fetch_row(mdb)) != NULL) { + Dmsg0(500, "db_sql_query sql_store_result suceeded\n"); + while ((row = sql_fetch_row(mdb)) != NULL) { - Dmsg0(50, "db_sql_query sql_fetch_row worked\n"); - if (result_handler(ctx, num_fields, row)) - break; - } + Dmsg0(500, "db_sql_query sql_fetch_row worked\n"); + if (result_handler(ctx, num_fields, row)) + break; + } - sql_free_result(mdb); + sql_free_result(mdb); } } db_unlock(mdb); - Dmsg0(50, "db_sql_query finished\n"); + Dmsg0(500, "db_sql_query finished\n"); - return 1; + return true; } -POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb) +POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb) { - int j; + int j; POSTGRESQL_ROW row = NULL; // by default, return NULL - Dmsg0(50, "my_postgresql_fetch_row start\n"); + Dmsg0(500, "my_postgresql_fetch_row start\n"); - if (mdb->row_number == -1 || mdb->row == NULL) { - Dmsg1(50, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields); + if (!mdb->row || mdb->row_size < mdb->num_fields) { + int num_fields = mdb->num_fields; + Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields); - if (mdb->row != NULL) { - Dmsg0(50, "my_postgresql_fetch_row freeing space\n"); - free(mdb->row); - mdb->row = NULL; + if (mdb->row) { + Dmsg0(500, "my_postgresql_fetch_row freeing space\n"); + free(mdb->row); } - - mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields); + num_fields += 20; /* add a bit extra */ + mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields); + mdb->row_size = num_fields; // now reset the row_number now that we have the space allocated mdb->row_number = 0; } // if still within the result set - if (mdb->row_number < mdb->num_rows) { - Dmsg2(50, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); + if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) { + Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); // get each value from this row for (j = 0; j < mdb->num_fields; j++) { - mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j); - Dmsg2(50, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]); + mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j); + Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]); } // increment the row number for the next call mdb->row_number++; row = mdb->row; } else { - Dmsg2(50, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); + Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); } - Dmsg1(50, "my_postgresql_fetch_row finishes returning %x\n", row); + Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row); return row; } @@ -342,55 +442,60 @@ int my_postgresql_max_length(B_DB *mdb, int field_num) { // // for a given column, find the max length // - int max_length; + int max_length; int i; - int this_length; + int this_length; max_length = 0; for (i = 0; i < mdb->num_rows; i++) { if (PQgetisnull(mdb->result, i, field_num)) { this_length = 4; // "NULL" } else { - this_length = strlen(PQgetvalue(mdb->result, i, field_num)); + this_length = cstrlen(PQgetvalue(mdb->result, i, field_num)); } - + if (max_length < this_length) { - max_length = this_length; + max_length = this_length; } } return max_length; } -POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb) +POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb) { - int i; + int i; + + Dmsg0(500, "my_postgresql_fetch_field starts\n"); - Dmsg0(50, "my_postgresql_fetch_field starts\n"); - if (mdb->fields == NULL) { - Dmsg1(50, "allocating space for %d fields\n", mdb->num_fields); + if (!mdb->fields || mdb->fields_size < mdb->num_fields) { + if (mdb->fields) { + free(mdb->fields); + } + Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields); mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields); + mdb->fields_size = mdb->num_fields; for (i = 0; i < mdb->num_fields; i++) { - Dmsg1(50, "filling field %d\n", i); - mdb->fields[i].name = PQfname(mdb->result, i); - mdb->fields[i].max_length = my_postgresql_max_length(mdb, i); - mdb->fields[i].type = PQftype(mdb->result, i); - mdb->fields[i].flags = 0; - - Dmsg4(50, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n", - mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type, - mdb->fields[i].flags); + Dmsg1(500, "filling field %d\n", i); + mdb->fields[i].name = PQfname(mdb->result, i); + mdb->fields[i].max_length = my_postgresql_max_length(mdb, i); + mdb->fields[i].type = PQftype(mdb->result, i); + mdb->fields[i].flags = 0; + + Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n", + mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type, + mdb->fields[i].flags); } // end for } // end if // increment field number for the next time around - Dmsg0(50, "my_postgresql_fetch_field finishes\n"); + Dmsg0(500, "my_postgresql_fetch_field finishes\n"); return &mdb->fields[mdb->field_number++]; } -void my_postgresql_data_seek(B_DB *mdb, int row) +void my_postgresql_data_seek(B_DB *mdb, int row) { // set the row number to be returned on the next call // to my_postgresql_fetch_row @@ -402,42 +507,78 @@ void my_postgresql_field_seek(B_DB *mdb, int field) mdb->field_number = field; } -int my_postgresql_query(B_DB *mdb, char *query) { - Dmsg0(50, "my_postgresql_query started\n"); +/* + * Note, if this routine returns 1 (failure), Bacula expects + * that no result has been stored. + * This is where QUERY_DB comes with Postgresql. + * + * Returns: 0 on success + * 1 on failure + * + */ +int my_postgresql_query(B_DB *mdb, const char *query) +{ + Dmsg0(500, "my_postgresql_query started\n"); // We are starting a new query. reset everything. mdb->num_rows = -1; mdb->row_number = -1; mdb->field_number = -1; + if (mdb->result) { + PQclear(mdb->result); /* hmm, someone forgot to free?? */ + mdb->result = NULL; + } + + Dmsg1(500, "my_postgresql_query starts with '%s'\n", query); + + for (int i=0; i < 10; i++) { + mdb->result = PQexec(mdb->db, query); + if (mdb->result) { + break; + } + bmicrosleep(5, 0); + } + if (!mdb->result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } - Dmsg1(50, "my_postgresql_query starts with '%s'\n", query); - mdb->result = PQexec(mdb->db, query); mdb->status = PQresultStatus(mdb->result); if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) { - Dmsg1(50, "we have a result\n", query); + Dmsg1(500, "we have a result\n", query); // how many fields in the set? - mdb->num_fields = (int) PQnfields(mdb->result); - Dmsg1(50, "we have %d fields\n", mdb->num_fields); + mdb->num_fields = (int)PQnfields(mdb->result); + Dmsg1(500, "we have %d fields\n", mdb->num_fields); - mdb->num_rows = PQntuples(mdb->result); - Dmsg1(50, "we have %d rows\n", mdb->num_rows); + mdb->num_rows = PQntuples(mdb->result); + Dmsg1(500, "we have %d rows\n", mdb->num_rows); - mdb->status = 0; + mdb->row_number = 0; /* we can start to fetch something */ + mdb->status = 0; /* succeed */ } else { - Dmsg1(50, "we failed\n", query); - mdb->status = 1; + Dmsg1(50, "Result status failed: %s\n", query); + goto bail_out; } - Dmsg0(50, "my_postgresql_query finishing\n"); + Dmsg0(500, "my_postgresql_query finishing\n"); + return mdb->status; +bail_out: + Dmsg1(500, "we failed\n", query); + PQclear(mdb->result); + mdb->result = NULL; + mdb->status = 1; /* failed */ return mdb->status; } -void my_postgresql_free_result (B_DB *mdb) +void my_postgresql_free_result(B_DB *mdb) { + + db_lock(mdb); if (mdb->result) { PQclear(mdb->result); + mdb->result = NULL; } if (mdb->row) { @@ -449,14 +590,15 @@ void my_postgresql_free_result (B_DB *mdb) free(mdb->fields); mdb->fields = NULL; } + db_unlock(mdb); } -int my_postgresql_currval(B_DB *mdb, char *table_name) +int my_postgresql_currval(B_DB *mdb, const char *table_name) { // Obtain the current value of the sequence that // provides the serial value for primary key of the table. - // currval is local to our session. It is not affected by + // currval is local to our session. It is not affected by // other transactions. // Determine the name of the sequence. @@ -472,7 +614,7 @@ int my_postgresql_currval(B_DB *mdb, char *table_name) char sequence[NAMEDATALEN-1]; char query [NAMEDATALEN+50]; PGresult *result; - int id = 0; + int id = 0; if (strcasecmp(table_name, "basefiles") == 0) { bstrncpy(sequence, "basefiles_baseid", sizeof(sequence)); @@ -486,24 +628,258 @@ int my_postgresql_currval(B_DB *mdb, char *table_name) bstrncat(sequence, "_seq", sizeof(sequence)); bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence); -// Mmsg(&query, "SELECT currval('%s')", sequence); - Dmsg1(50, "my_postgresql_currval invoked with '%s'\n", query); - result = PQexec(mdb->db, query); + Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query); + for (int i=0; i < 10; i++) { + result = PQexec(mdb->db, query); + if (result) { + break; + } + bmicrosleep(5, 0); + } + if (!result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } - Dmsg0(50, "exec done"); + Dmsg0(500, "exec done"); if (PQresultStatus(result) == PGRES_TUPLES_OK) { - Dmsg0(50, "getting value"); + Dmsg0(500, "getting value"); id = atoi(PQgetvalue(result, 0, 0)); - Dmsg2(50, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id); + Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id); } else { + Dmsg1(50, "Result status failed: %s\n", query); Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db)); } +bail_out: PQclear(result); return id; } +#ifdef HAVE_BATCH_FILE_INSERT + +int my_postgresql_batch_start(JCR *jcr, B_DB *mdb) +{ + const char *query = "COPY batch FROM STDIN"; + + Dmsg0(500, "my_postgresql_batch_start started\n"); + + if (my_postgresql_query(mdb, + "CREATE TEMPORARY TABLE batch (" + "fileindex int," + "jobid int," + "path varchar," + "name varchar," + "lstat varchar," + "md5 varchar)") == 1) + { + Dmsg0(500, "my_postgresql_batch_start failed\n"); + return 1; + } + + // We are starting a new query. reset everything. + mdb->num_rows = -1; + mdb->row_number = -1; + mdb->field_number = -1; + + my_postgresql_free_result(mdb); + + for (int i=0; i < 10; i++) { + mdb->result = PQexec(mdb->db, query); + if (mdb->result) { + break; + } + bmicrosleep(5, 0); + } + if (!mdb->result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } + + mdb->status = PQresultStatus(mdb->result); + if (mdb->status == PGRES_COPY_IN) { + // how many fields in the set? + mdb->num_fields = (int) PQnfields(mdb->result); + mdb->num_rows = 0; + mdb->status = 1; + } else { + Dmsg1(50, "Result status failed: %s\n", query); + goto bail_out; + } + + Dmsg0(500, "my_postgresql_batch_start finishing\n"); + + return mdb->status; + +bail_out: + Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db)); + mdb->status = 0; + PQclear(mdb->result); + mdb->result = NULL; + return mdb->status; +} + +/* set error to something to abort operation */ +int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error) +{ + int res; + int count=30; + PGresult *result; + Dmsg0(500, "my_postgresql_batch_end started\n"); + + if (!mdb) { /* no files ? */ + return 0; + } + + do { + res = PQputCopyEnd(mdb->db, error); + } while (res == 0 && --count > 0); + + if (res == 1) { + Dmsg0(500, "ok\n"); + mdb->status = 1; + } + + if (res <= 0) { + Dmsg0(500, "we failed\n"); + mdb->status = 0; + Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db)); + } + + /* Check command status and return to normal libpq state */ + result = PQgetResult(mdb->db); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db)); + mdb->status = 0; + } + PQclear(result); + + Dmsg0(500, "my_postgresql_batch_end finishing\n"); + + return mdb->status; +} + +int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar) +{ + int res; + int count=30; + size_t len; + const char *digest; + char ed1[50]; + + mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1); + my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl); + + mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1); + my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl); + + if (ar->Digest == NULL || ar->Digest[0] == 0) { + digest = "0"; + } else { + digest = ar->Digest; + } + + len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", + ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, + mdb->esc_name, ar->attr, digest); + + do { + res = PQputCopyData(mdb->db, + mdb->cmd, + len); + } while (res == 0 && --count > 0); + + if (res == 1) { + Dmsg0(500, "ok\n"); + mdb->changes++; + mdb->status = 1; + } + + if (res <= 0) { + Dmsg0(500, "we failed\n"); + mdb->status = 0; + Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db)); + } + + Dmsg0(500, "my_postgresql_batch_insert finishing\n"); + + return mdb->status; +} + +#endif /* HAVE_BATCH_FILE_INSERT */ + +/* + * Escape strings so that PostgreSQL is happy on COPY + * + * NOTE! len is the length of the old string. Your new + * string must be long enough (max 2*old+1) to hold + * the escaped output. + */ +char *my_postgresql_copy_escape(char *dest, char *src, size_t len) +{ + /* we have to escape \t, \n, \r, \ */ + char c = '\0' ; + + while (len > 0 && *src) { + switch (*src) { + case '\n': + c = 'n'; + break; + case '\\': + c = '\\'; + break; + case '\t': + c = 't'; + break; + case '\r': + c = 'r'; + break; + default: + c = '\0' ; + } + + if (c) { + *dest = '\\'; + dest++; + *dest = c; + } else { + *dest = *src; + } + + len--; + src++; + dest++; + } + + *dest = '\0'; + return dest; +} + +#ifdef HAVE_BATCH_FILE_INSERT +const char *my_pg_batch_lock_path_query = + "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE"; + + +const char *my_pg_batch_lock_filename_query = + "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE"; + +const char *my_pg_batch_unlock_tables_query = "COMMIT"; + +const char *my_pg_batch_fill_path_query = + "INSERT INTO Path (Path) " + "SELECT a.Path FROM " + "(SELECT DISTINCT Path FROM batch) AS a " + "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "; + + +const char *my_pg_batch_fill_filename_query = + "INSERT INTO Filename (Name) " + "SELECT a.Name FROM " + "(SELECT DISTINCT Name FROM batch) as a " + "WHERE NOT EXISTS " + "(SELECT Name FROM Filename WHERE Name = a.Name)"; +#endif /* HAVE_BATCH_FILE_INSERT */ #endif /* HAVE_POSTGRESQL */