X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fcats%2Fpostgresql.c;h=959baf75ed239ab91f2c73307b47754834db3f9e;hb=c9657772721a3782558c4eb8b99978c7873b45fd;hp=ddcd2cda32de439128e2689306a077f5fe679bb7;hpb=df5a4b58cbe24b47d9da011684f8b01e8b730b60;p=bacula%2Fbacula diff --git a/bacula/src/cats/postgresql.c b/bacula/src/cats/postgresql.c index ddcd2cda32..959baf75ed 100644 --- a/bacula/src/cats/postgresql.c +++ b/bacula/src/cats/postgresql.c @@ -1,3 +1,30 @@ +/* + Bacula® - The Network Backup Solution + + Copyright (C) 2003-2010 Free Software Foundation Europe e.V. + + The main author of Bacula is Kern Sibbald, with contributions from + many others, a complete list can be found in the file AUTHORS. + This program is Free Software; you can redistribute it and/or + modify it under the terms of version three of the GNU Affero General Public + License as published by the Free Software Foundation and included + in the file LICENSE. + + This program is distributed in the hope that it will be useful, but + WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + 02110-1301, USA. + + Bacula® is a registered trademark of Kern Sibbald. + The licensor of Bacula is the Free Software Foundation Europe + (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, + Switzerland, email:ftf@fsfeurope.org. +*/ /* * Bacula Catalog Database routines specific to PostgreSQL * These are PostgreSQL specific routines @@ -5,21 +32,6 @@ * Dan Langille, December 2003 * based upon work done by Kern Sibbald, March 2000 * - * Version $Id$ - */ -/* - Copyright (C) 2003-2006 Kern Sibbald - - This program is free software; you can redistribute it and/or - modify it under the terms of the GNU General Public License - version 2 as amended with additional clauses defined in the - file LICENSE in the main source directory. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - the file LICENSE for additional details. - */ @@ -34,6 +46,7 @@ #ifdef HAVE_POSTGRESQL #include "postgres_ext.h" /* needed for NAMEDATALEN */ +#include "pg_config_manual.h" /* get NAMEDATALEN on version 8.3 or later */ /* ----------------------------------------------------------------------- * @@ -43,10 +56,20 @@ */ /* List of open databases */ -static BQUEUE db_list = {&db_list, &db_list}; +static dlist *db_list = NULL; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +/* + * Retrieve database type + */ +const char * +db_get_type(void) +{ + return "PostgreSQL"; + +} + /* * Initialize database data structure. In principal this should * never have errors, or it is really fatal. @@ -56,17 +79,22 @@ db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char const char *db_address, int db_port, const char *db_socket, int mult_db_connections) { - B_DB *mdb; + B_DB *mdb = NULL; if (!db_user) { Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n")); return NULL; } P(mutex); /* lock DB queue */ + if (db_list == NULL) { + db_list = New(dlist(mdb, &mdb->link)); + } if (!mult_db_connections) { /* Look to see if DB already open */ - for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) { - if (strcmp(mdb->db_name, db_name) == 0) { + foreach_dlist(mdb, db_list) { + if (bstrcmp(mdb->db_name, db_name) && + bstrcmp(mdb->db_address, db_address) && + mdb->db_port == db_port) { Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name); mdb->ref_count++; V(mutex); @@ -89,7 +117,6 @@ db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char mdb->db_socket = bstrdup(db_socket); } mdb->db_port = db_port; - mdb->have_insert_id = TRUE; mdb->errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */ *mdb->errmsg = 0; mdb->cmd = get_pool_memory(PM_EMSG); /* get command buffer */ @@ -99,12 +126,45 @@ db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char mdb->fname = get_pool_memory(PM_FNAME); mdb->path = get_pool_memory(PM_FNAME); mdb->esc_name = get_pool_memory(PM_FNAME); + mdb->esc_path = get_pool_memory(PM_FNAME); mdb->allow_transactions = mult_db_connections; - qinsert(&db_list, &mdb->bq); /* put db in list */ + db_list->append(mdb); /* put db in list */ V(mutex); return mdb; } +/* Check that the database correspond to the encoding we want */ +static bool check_database_encoding(JCR *jcr, B_DB *mdb) +{ + SQL_ROW row; + int ret=false; + + if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) { + Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg); + return false; + } + + if ((row = sql_fetch_row(mdb)) == NULL) { + Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb)); + Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg); + } else { + ret = bstrcmp(row[0], "SQL_ASCII"); + + if (ret) { + /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */ + db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL); + + } else { /* something is wrong with database encoding */ + Mmsg(mdb->errmsg, + _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"), + mdb->db_name, row[0]); + Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg); + Dmsg1(50, "%s", mdb->errmsg); + } + } + return ret; +} + /* * Now actually open the database. This can generate errors, * which are returned in the errmsg @@ -125,8 +185,9 @@ db_open_database(JCR *jcr, B_DB *mdb) mdb->connected = false; if ((errstat=rwl_init(&mdb->lock)) != 0) { + berrno be; Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"), - strerror(errstat)); + be.bstrerror(errstat)); V(mutex); return 0; } @@ -162,22 +223,31 @@ db_open_database(JCR *jcr, B_DB *mdb) mdb->db_password==NULL?"(NULL)":mdb->db_password); if (PQstatus(mdb->db) != CONNECTION_OK) { - Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n" - "Database=%s User=%s\n" - "It is probably not running or your password is incorrect.\n"), - mdb->db_name, mdb->db_user); + Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n" + "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"), + mdb->db_name, mdb->db_user); V(mutex); return 0; } + mdb->connected = true; + if (!check_tables_version(jcr, mdb)) { V(mutex); return 0; } sql_query(mdb, "SET datestyle TO 'ISO, YMD'"); + + /* tell PostgreSQL we are using standard conforming strings + and avoid warnings such as: + WARNING: nonstandard use of \\ in a string literal + */ + sql_query(mdb, "set standard_conforming_strings=on"); + + /* check that encoding is SQL_ASCII */ + check_database_encoding(jcr, mdb); - mdb->connected = true; V(mutex); return 1; } @@ -190,9 +260,10 @@ db_close_database(JCR *jcr, B_DB *mdb) } db_end_transaction(jcr, mdb); P(mutex); + sql_free_result(mdb); mdb->ref_count--; if (mdb->ref_count == 0) { - qdchain(&mdb->bq); + db_list->remove(mdb); if (mdb->connected && mdb->db) { sql_close(mdb); } @@ -203,6 +274,7 @@ db_close_database(JCR *jcr, B_DB *mdb) free_pool_memory(mdb->fname); free_pool_memory(mdb->path); free_pool_memory(mdb->esc_name); + free_pool_memory(mdb->esc_path); if (mdb->db_name) { free(mdb->db_name); } @@ -218,12 +290,33 @@ db_close_database(JCR *jcr, B_DB *mdb) if (mdb->db_socket) { free(mdb->db_socket); } - my_postgresql_free_result(mdb); + if (mdb->esc_obj) { + PQfreemem(mdb->esc_obj); + } free(mdb); + if (db_list->size() == 0) { + delete db_list; + db_list = NULL; + } } V(mutex); } +void db_check_backend_thread_safe() +{ +#ifdef HAVE_BATCH_FILE_INSERT +# ifdef HAVE_PQISTHREADSAFE + if (!PQisthreadsafe()) { + Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe " + "when using BatchMode.\n")); + } +# endif +#endif +} + +void db_thread_cleanup() +{ } + /* * Return the next unique index (auto-increment) for * the given table. Return NULL on error. @@ -237,6 +330,61 @@ int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index) return 1; } +/* + * Escape binary so that PostgreSQL is happy + * + */ +char * +db_escape_object(JCR *jcr, B_DB *mdb, char *old, int len) +{ + size_t new_len; + if (mdb->esc_obj) { + PQfreemem(mdb->esc_obj); + } + + mdb->esc_obj = PQescapeByteaConn(mdb->db, (unsigned const char *)old, + len, &new_len); + + if (!mdb->esc_obj) { + Jmsg(jcr, M_FATAL, 0, _("PQescapeByteaConn returned NULL.\n")); + } + + return (char *)mdb->esc_obj; +} + +/* + * Unescape binary object so that PostgreSQL is happy + * + */ +void +db_unescape_object(JCR *jcr, B_DB *mdb, + char *from, int32_t expected_len, + POOLMEM **dest, int32_t *dest_len) +{ + size_t new_len; + unsigned char *obj; + + if (!from) { + *dest[0] = 0; + *dest_len = 0; + return; + } + + obj = PQunescapeBytea((unsigned const char *)from, &new_len); + + if (!obj) { + Jmsg(jcr, M_FATAL, 0, _("PQunescapeByteaConn returned NULL.\n")); + } + + *dest_len = new_len; + *dest = check_pool_memory_size(*dest, new_len+1); + memcpy(*dest, obj, new_len); + (*dest)[new_len]=0; + + PQfreemem(obj); + + Dmsg1(010, "obj size: %d\n", *dest_len); +} /* * Escape strings so that PostgreSQL is happy @@ -246,16 +394,24 @@ int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index) * the escaped output. */ void -db_escape_string(char *snew, char *old, int len) +db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len) { - PQescapeString(snew, old, len); + int error; + + PQescapeStringConn(mdb->db, snew, old, len, &error); + if (error) { + Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n")); + /* error on encoding, probably invalid multibyte encoding in the source string + see PQescapeStringConn documentation for details. */ + Dmsg0(500, "PQescapeStringConn failed\n"); + } } /* * Submit a general SQL command (cmd), and for each row returned, * the sqlite_handler is called with the ctx. */ -int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx) +bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx) { SQL_ROW row; @@ -266,7 +422,7 @@ int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb)); db_unlock(mdb); Dmsg0(500, "db_sql_query failed\n"); - return 0; + return false; } Dmsg0(500, "db_sql_query succeeded. checking handler\n"); @@ -290,7 +446,7 @@ int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler Dmsg0(500, "db_sql_query finished\n"); - return 1; + return true; } @@ -302,23 +458,24 @@ POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb) Dmsg0(500, "my_postgresql_fetch_row start\n"); - if (mdb->row_number == -1 || mdb->row == NULL) { + if (!mdb->row || mdb->row_size < mdb->num_fields) { + int num_fields = mdb->num_fields; Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields); - if (mdb->row != NULL) { + if (mdb->row) { Dmsg0(500, "my_postgresql_fetch_row freeing space\n"); free(mdb->row); - mdb->row = NULL; } - - mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields); + num_fields += 20; /* add a bit extra */ + mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields); + mdb->row_size = num_fields; // now reset the row_number now that we have the space allocated mdb->row_number = 0; } // if still within the result set - if (mdb->row_number < mdb->num_rows) { + if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) { Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); // get each value from this row for (j = 0; j < mdb->num_fields; j++) { @@ -333,7 +490,7 @@ POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb) Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows); } - Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row); + Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row); return row; } @@ -367,9 +524,14 @@ POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb) int i; Dmsg0(500, "my_postgresql_fetch_field starts\n"); - if (mdb->fields == NULL) { + + if (!mdb->fields || mdb->fields_size < mdb->num_fields) { + if (mdb->fields) { + free(mdb->fields); + } Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields); mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields); + mdb->fields_size = mdb->num_fields; for (i = 0; i < mdb->num_fields; i++) { Dmsg1(500, "filling field %d\n", i); @@ -405,44 +567,72 @@ void my_postgresql_field_seek(B_DB *mdb, int field) /* * Note, if this routine returns 1 (failure), Bacula expects * that no result has been stored. + * This is where QUERY_DB comes with Postgresql. + * + * Returns: 0 on success + * 1 on failure + * */ -int my_postgresql_query(B_DB *mdb, const char *query) { +int my_postgresql_query(B_DB *mdb, const char *query) +{ Dmsg0(500, "my_postgresql_query started\n"); // We are starting a new query. reset everything. mdb->num_rows = -1; mdb->row_number = -1; mdb->field_number = -1; - if (mdb->result != NULL) { + if (mdb->result) { PQclear(mdb->result); /* hmm, someone forgot to free?? */ + mdb->result = NULL; } Dmsg1(500, "my_postgresql_query starts with '%s'\n", query); - mdb->result = PQexec(mdb->db, query); + + for (int i=0; i < 10; i++) { + mdb->result = PQexec(mdb->db, query); + if (mdb->result) { + break; + } + bmicrosleep(5, 0); + } + if (!mdb->result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } + mdb->status = PQresultStatus(mdb->result); if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) { Dmsg1(500, "we have a result\n", query); // how many fields in the set? - mdb->num_fields = (int) PQnfields(mdb->result); + mdb->num_fields = (int)PQnfields(mdb->result); Dmsg1(500, "we have %d fields\n", mdb->num_fields); - mdb->num_rows = PQntuples(mdb->result); + mdb->num_rows = PQntuples(mdb->result); Dmsg1(500, "we have %d rows\n", mdb->num_rows); - mdb->status = 0; + mdb->row_number = 0; /* we can start to fetch something */ + mdb->status = 0; /* succeed */ } else { - Dmsg1(500, "we failed\n", query); - mdb->status = 1; + Dmsg1(50, "Result status failed: %s\n", query); + goto bail_out; } Dmsg0(500, "my_postgresql_query finishing\n"); + return mdb->status; +bail_out: + Dmsg1(500, "we failed\n", query); + PQclear(mdb->result); + mdb->result = NULL; + mdb->status = 1; /* failed */ return mdb->status; } -void my_postgresql_free_result (B_DB *mdb) +void my_postgresql_free_result(B_DB *mdb) { + + db_lock(mdb); if (mdb->result) { PQclear(mdb->result); mdb->result = NULL; @@ -457,9 +647,10 @@ void my_postgresql_free_result (B_DB *mdb) free(mdb->fields); mdb->fields = NULL; } + db_unlock(mdb); } -int my_postgresql_currval(B_DB *mdb, char *table_name) +static uint64_t my_postgresql_currval(B_DB *mdb, const char *table_name) { // Obtain the current value of the sequence that // provides the serial value for primary key of the table. @@ -480,7 +671,7 @@ int my_postgresql_currval(B_DB *mdb, char *table_name) char sequence[NAMEDATALEN-1]; char query [NAMEDATALEN+50]; PGresult *result; - int id = 0; + uint64_t id = 0; if (strcasecmp(table_name, "basefiles") == 0) { bstrncpy(sequence, "basefiles_baseid", sizeof(sequence)); @@ -494,24 +685,277 @@ int my_postgresql_currval(B_DB *mdb, char *table_name) bstrncat(sequence, "_seq", sizeof(sequence)); bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence); -// Mmsg(query, "SELECT currval('%s')", sequence); Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query); - result = PQexec(mdb->db, query); + for (int i=0; i < 10; i++) { + result = PQexec(mdb->db, query); + if (result) { + break; + } + bmicrosleep(5, 0); + } + if (!result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } Dmsg0(500, "exec done"); if (PQresultStatus(result) == PGRES_TUPLES_OK) { Dmsg0(500, "getting value"); - id = atoi(PQgetvalue(result, 0, 0)); + id = str_to_uint64(PQgetvalue(result, 0, 0)); Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id); } else { + Dmsg1(50, "Result status failed: %s\n", query); Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db)); } +bail_out: PQclear(result); - + return id; } +uint64_t my_postgresql_insert_autokey_record(B_DB *mdb, const char *query, const char *table_name) +{ + /* + * First execute the insert query and then retrieve the currval. + */ + if (my_postgresql_query(mdb, query)) { + return 0; + } + + mdb->num_rows = sql_affected_rows(mdb); + if (mdb->num_rows != 1) { + return 0; + } + + mdb->changes++; + + return my_postgresql_currval(mdb, table_name); +} + +#ifdef HAVE_BATCH_FILE_INSERT + +int my_postgresql_batch_start(JCR *jcr, B_DB *mdb) +{ + const char *query = "COPY batch FROM STDIN"; + + Dmsg0(500, "my_postgresql_batch_start started\n"); + + if (my_postgresql_query(mdb, + "CREATE TEMPORARY TABLE batch (" + "fileindex int," + "jobid int," + "path varchar," + "name varchar," + "lstat varchar," + "md5 varchar)") == 1) + { + Dmsg0(500, "my_postgresql_batch_start failed\n"); + return 1; + } + + // We are starting a new query. reset everything. + mdb->num_rows = -1; + mdb->row_number = -1; + mdb->field_number = -1; + + my_postgresql_free_result(mdb); + + for (int i=0; i < 10; i++) { + mdb->result = PQexec(mdb->db, query); + if (mdb->result) { + break; + } + bmicrosleep(5, 0); + } + if (!mdb->result) { + Dmsg1(50, "Query failed: %s\n", query); + goto bail_out; + } + + mdb->status = PQresultStatus(mdb->result); + if (mdb->status == PGRES_COPY_IN) { + // how many fields in the set? + mdb->num_fields = (int) PQnfields(mdb->result); + mdb->num_rows = 0; + mdb->status = 1; + } else { + Dmsg1(50, "Result status failed: %s\n", query); + goto bail_out; + } + + Dmsg0(500, "my_postgresql_batch_start finishing\n"); + + return mdb->status; + +bail_out: + Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db)); + mdb->status = 0; + PQclear(mdb->result); + mdb->result = NULL; + return mdb->status; +} + +/* set error to something to abort operation */ +int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error) +{ + int res; + int count=30; + PGresult *result; + Dmsg0(500, "my_postgresql_batch_end started\n"); + + if (!mdb) { /* no files ? */ + return 0; + } + + do { + res = PQputCopyEnd(mdb->db, error); + } while (res == 0 && --count > 0); + + if (res == 1) { + Dmsg0(500, "ok\n"); + mdb->status = 1; + } + + if (res <= 0) { + Dmsg0(500, "we failed\n"); + mdb->status = 0; + Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db)); + } + + /* Check command status and return to normal libpq state */ + result = PQgetResult(mdb->db); + if (PQresultStatus(result) != PGRES_COMMAND_OK) { + Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db)); + mdb->status = 0; + } + PQclear(result); + + Dmsg0(500, "my_postgresql_batch_end finishing\n"); + + return mdb->status; +} + +int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar) +{ + int res; + int count=30; + size_t len; + const char *digest; + char ed1[50]; + + mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1); + my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl); + + mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1); + my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl); + + if (ar->Digest == NULL || ar->Digest[0] == 0) { + digest = "0"; + } else { + digest = ar->Digest; + } + + len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", + ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, + mdb->esc_name, ar->attr, digest); + + do { + res = PQputCopyData(mdb->db, + mdb->cmd, + len); + } while (res == 0 && --count > 0); + + if (res == 1) { + Dmsg0(500, "ok\n"); + mdb->changes++; + mdb->status = 1; + } + + if (res <= 0) { + Dmsg0(500, "we failed\n"); + mdb->status = 0; + Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db)); + } + + Dmsg0(500, "my_postgresql_batch_insert finishing\n"); + + return mdb->status; +} + +#endif /* HAVE_BATCH_FILE_INSERT */ + +/* + * Escape strings so that PostgreSQL is happy on COPY + * + * NOTE! len is the length of the old string. Your new + * string must be long enough (max 2*old+1) to hold + * the escaped output. + */ +char *my_postgresql_copy_escape(char *dest, char *src, size_t len) +{ + /* we have to escape \t, \n, \r, \ */ + char c = '\0' ; + + while (len > 0 && *src) { + switch (*src) { + case '\n': + c = 'n'; + break; + case '\\': + c = '\\'; + break; + case '\t': + c = 't'; + break; + case '\r': + c = 'r'; + break; + default: + c = '\0' ; + } + + if (c) { + *dest = '\\'; + dest++; + *dest = c; + } else { + *dest = *src; + } + + len--; + src++; + dest++; + } + + *dest = '\0'; + return dest; +} + +#ifdef HAVE_BATCH_FILE_INSERT +const char *my_pg_batch_lock_path_query = + "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE"; + + +const char *my_pg_batch_lock_filename_query = + "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE"; + +const char *my_pg_batch_unlock_tables_query = "COMMIT"; + +const char *my_pg_batch_fill_path_query = + "INSERT INTO Path (Path) " + "SELECT a.Path FROM " + "(SELECT DISTINCT Path FROM batch) AS a " + "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "; + + +const char *my_pg_batch_fill_filename_query = + "INSERT INTO Filename (Name) " + "SELECT a.Name FROM " + "(SELECT DISTINCT Name FROM batch) as a " + "WHERE NOT EXISTS " + "(SELECT Name FROM Filename WHERE Name = a.Name)"; +#endif /* HAVE_BATCH_FILE_INSERT */ #endif /* HAVE_POSTGRESQL */