]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Tweak mutex order for SD
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151
152       if (ret) {
153          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
154          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
155
156       } else {                  /* something is wrong with database encoding */
157          Mmsg(mdb->errmsg, 
158               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
159               mdb->db_name, row[0]);
160          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
161          Dmsg1(50, "%s", mdb->errmsg);
162       } 
163    }
164    return ret;
165 }
166
167 /*
168  * Now actually open the database.  This can generate errors,
169  *   which are returned in the errmsg
170  *
171  * DO NOT close the database or free(mdb) here !!!!
172  */
173 int
174 db_open_database(JCR *jcr, B_DB *mdb)
175 {
176    int errstat;
177    char buf[10], *port;
178
179 #ifdef xxx                      /* require libpq >= 8.2 */
180    if (!PQisthreadsafe()) {
181       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
182            "PostgreSQL library is not thread safe. Cannot continue.\n"));
183    }
184 #endif
185    P(mutex);
186    if (mdb->connected) {
187       V(mutex);
188       return 1;
189    }
190    mdb->connected = false;
191
192    if ((errstat=rwl_init(&mdb->lock)) != 0) {
193       berrno be;
194       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
195             be.bstrerror(errstat));
196       V(mutex);
197       return 0;
198    }
199
200    if (mdb->db_port) {
201       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202       port = buf;
203    } else {
204       port = NULL;
205    }
206
207    /* If connection fails, try at 5 sec intervals for 30 seconds. */
208    for (int retry=0; retry < 6; retry++) {
209       /* connect to the database */
210       mdb->db = PQsetdbLogin(
211            mdb->db_address,           /* default = localhost */
212            port,                      /* default port */
213            NULL,                      /* pg options */
214            NULL,                      /* tty, ignored */
215            mdb->db_name,              /* database name */
216            mdb->db_user,              /* login name */
217            mdb->db_password);         /* password */
218
219       /* If no connect, try once more in case it is a timing problem */
220       if (PQstatus(mdb->db) == CONNECTION_OK) {
221          break;
222       }
223       bmicrosleep(5, 0);
224    }
225
226    Dmsg0(50, "pg_real_connect done\n");
227    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
228             mdb->db_password==NULL?"(NULL)":mdb->db_password);
229
230    if (PQstatus(mdb->db) != CONNECTION_OK) {
231       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
232          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
233          mdb->db_name, mdb->db_user);
234       V(mutex);
235       return 0;
236    }
237
238    mdb->connected = true;
239
240    if (!check_tables_version(jcr, mdb)) {
241       V(mutex);
242       return 0;
243    }
244
245    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
246    
247    /* tell PostgreSQL we are using standard conforming strings
248       and avoid warnings such as:
249        WARNING:  nonstandard use of \\ in a string literal
250    */
251    sql_query(mdb, "set standard_conforming_strings=on");
252
253    /* check that encoding is SQL_ASCII */
254    check_database_encoding(jcr, mdb);
255
256    V(mutex);
257    return 1;
258 }
259
260 void
261 db_close_database(JCR *jcr, B_DB *mdb)
262 {
263    if (!mdb) {
264       return;
265    }
266    db_end_transaction(jcr, mdb);
267    P(mutex);
268    sql_free_result(mdb);
269    mdb->ref_count--;
270    if (mdb->ref_count == 0) {
271       qdchain(&mdb->bq);
272       if (mdb->connected && mdb->db) {
273          sql_close(mdb);
274       }
275       rwl_destroy(&mdb->lock);
276       free_pool_memory(mdb->errmsg);
277       free_pool_memory(mdb->cmd);
278       free_pool_memory(mdb->cached_path);
279       free_pool_memory(mdb->fname);
280       free_pool_memory(mdb->path);
281       free_pool_memory(mdb->esc_name);
282       free_pool_memory(mdb->esc_path);
283       if (mdb->db_name) {
284          free(mdb->db_name);
285       }
286       if (mdb->db_user) {
287          free(mdb->db_user);
288       }
289       if (mdb->db_password) {
290          free(mdb->db_password);
291       }
292       if (mdb->db_address) {
293          free(mdb->db_address);
294       }
295       if (mdb->db_socket) {
296          free(mdb->db_socket);
297       }
298       free(mdb);
299    }
300    V(mutex);
301 }
302
303 void db_thread_cleanup()
304 { }
305
306 /*
307  * Return the next unique index (auto-increment) for
308  * the given table.  Return NULL on error.
309  *
310  * For PostgreSQL, NULL causes the auto-increment value
311  *  to be updated.
312  */
313 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
314 {
315    strcpy(index, "NULL");
316    return 1;
317 }
318
319
320 /*
321  * Escape strings so that PostgreSQL is happy
322  *
323  *   NOTE! len is the length of the old string. Your new
324  *         string must be long enough (max 2*old+1) to hold
325  *         the escaped output.
326  */
327 void
328 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
329 {
330    int error;
331   
332    PQescapeStringConn(mdb->db, snew, old, len, &error);
333    if (error) {
334       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
335       /* error on encoding, probably invalid multibyte encoding in the source string
336         see PQescapeStringConn documentation for details. */
337       Dmsg0(500, "PQescapeStringConn failed\n");
338    }
339 }
340
341 /*
342  * Submit a general SQL command (cmd), and for each row returned,
343  *  the sqlite_handler is called with the ctx.
344  */
345 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
346 {
347    SQL_ROW row;
348
349    Dmsg0(500, "db_sql_query started\n");
350
351    db_lock(mdb);
352    if (sql_query(mdb, query) != 0) {
353       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
354       db_unlock(mdb);
355       Dmsg0(500, "db_sql_query failed\n");
356       return false;
357    }
358    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
359
360    if (result_handler != NULL) {
361       Dmsg0(500, "db_sql_query invoking handler\n");
362       if ((mdb->result = sql_store_result(mdb)) != NULL) {
363          int num_fields = sql_num_fields(mdb);
364
365          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
366          while ((row = sql_fetch_row(mdb)) != NULL) {
367
368             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
369             if (result_handler(ctx, num_fields, row))
370                break;
371          }
372
373         sql_free_result(mdb);
374       }
375    }
376    db_unlock(mdb);
377
378    Dmsg0(500, "db_sql_query finished\n");
379
380    return true;
381 }
382
383
384
385 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
386 {
387    int j;
388    POSTGRESQL_ROW row = NULL; // by default, return NULL
389
390    Dmsg0(500, "my_postgresql_fetch_row start\n");
391
392    if (!mdb->row || mdb->row_size < mdb->num_fields) {
393       int num_fields = mdb->num_fields;
394       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
395
396       if (mdb->row) {
397          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
398          free(mdb->row);
399       }
400       num_fields += 20;                  /* add a bit extra */
401       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
402       mdb->row_size = num_fields;
403
404       // now reset the row_number now that we have the space allocated
405       mdb->row_number = 0;
406    }
407
408    // if still within the result set
409    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
410       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
411       // get each value from this row
412       for (j = 0; j < mdb->num_fields; j++) {
413          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
414          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
415       }
416       // increment the row number for the next call
417       mdb->row_number++;
418
419       row = mdb->row;
420    } else {
421       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
422    }
423
424    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
425
426    return row;
427 }
428
429 int my_postgresql_max_length(B_DB *mdb, int field_num) {
430    //
431    // for a given column, find the max length
432    //
433    int max_length;
434    int i;
435    int this_length;
436
437    max_length = 0;
438    for (i = 0; i < mdb->num_rows; i++) {
439       if (PQgetisnull(mdb->result, i, field_num)) {
440           this_length = 4;        // "NULL"
441       } else {
442           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
443       }
444
445       if (max_length < this_length) {
446           max_length = this_length;
447       }
448    }
449
450    return max_length;
451 }
452
453 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
454 {
455    int     i;
456
457    Dmsg0(500, "my_postgresql_fetch_field starts\n");
458
459    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
460       if (mdb->fields) {
461          free(mdb->fields);
462       }
463       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
464       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
465       mdb->fields_size = mdb->num_fields;
466
467       for (i = 0; i < mdb->num_fields; i++) {
468          Dmsg1(500, "filling field %d\n", i);
469          mdb->fields[i].name           = PQfname(mdb->result, i);
470          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
471          mdb->fields[i].type       = PQftype(mdb->result, i);
472          mdb->fields[i].flags      = 0;
473
474          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
475             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
476             mdb->fields[i].flags);
477       } // end for
478    } // end if
479
480    // increment field number for the next time around
481
482    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
483    return &mdb->fields[mdb->field_number++];
484 }
485
486 void my_postgresql_data_seek(B_DB *mdb, int row)
487 {
488    // set the row number to be returned on the next call
489    // to my_postgresql_fetch_row
490    mdb->row_number = row;
491 }
492
493 void my_postgresql_field_seek(B_DB *mdb, int field)
494 {
495    mdb->field_number = field;
496 }
497
498 /*
499  * Note, if this routine returns 1 (failure), Bacula expects
500  *  that no result has been stored.
501  * This is where QUERY_DB comes with Postgresql.
502  *
503  *  Returns:  0  on success
504  *            1  on failure
505  *
506  */
507 int my_postgresql_query(B_DB *mdb, const char *query)
508 {
509    Dmsg0(500, "my_postgresql_query started\n");
510    // We are starting a new query.  reset everything.
511    mdb->num_rows     = -1;
512    mdb->row_number   = -1;
513    mdb->field_number = -1;
514
515    if (mdb->result) {
516       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
517       mdb->result = NULL;
518    }
519
520    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
521
522    for (int i=0; i < 10; i++) {
523       mdb->result = PQexec(mdb->db, query);
524       if (mdb->result) {
525          break;
526       }
527       bmicrosleep(5, 0);
528    }
529    if (!mdb->result) {
530       Dmsg1(50, "Query failed: %s\n", query);
531       goto bail_out;
532    }
533
534    mdb->status = PQresultStatus(mdb->result);
535    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
536       Dmsg1(500, "we have a result\n", query);
537
538       // how many fields in the set?
539       mdb->num_fields = (int)PQnfields(mdb->result);
540       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
541
542       mdb->num_rows = PQntuples(mdb->result);
543       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
544
545       mdb->row_number = 0;      /* we can start to fetch something */
546       mdb->status = 0;          /* succeed */
547    } else {
548       Dmsg1(50, "Result status failed: %s\n", query);
549       goto bail_out;
550    }
551
552    Dmsg0(500, "my_postgresql_query finishing\n");
553    return mdb->status;
554
555 bail_out:
556    Dmsg1(500, "we failed\n", query);
557    PQclear(mdb->result);
558    mdb->result = NULL;
559    mdb->status = 1;                   /* failed */
560    return mdb->status;
561 }
562
563 void my_postgresql_free_result(B_DB *mdb)
564 {
565    
566    db_lock(mdb);
567    if (mdb->result) {
568       PQclear(mdb->result);
569       mdb->result = NULL;
570    }
571
572    if (mdb->row) {
573       free(mdb->row);
574       mdb->row = NULL;
575    }
576
577    if (mdb->fields) {
578       free(mdb->fields);
579       mdb->fields = NULL;
580    }
581    db_unlock(mdb);
582 }
583
584 int my_postgresql_currval(B_DB *mdb, const char *table_name)
585 {
586    // Obtain the current value of the sequence that
587    // provides the serial value for primary key of the table.
588
589    // currval is local to our session.  It is not affected by
590    // other transactions.
591
592    // Determine the name of the sequence.
593    // PostgreSQL automatically creates a sequence using
594    // <table>_<column>_seq.
595    // At the time of writing, all tables used this format for
596    // for their primary key: <table>id
597    // Except for basefiles which has a primary key on baseid.
598    // Therefore, we need to special case that one table.
599
600    // everything else can use the PostgreSQL formula.
601
602    char      sequence[NAMEDATALEN-1];
603    char      query   [NAMEDATALEN+50];
604    PGresult *result;
605    int       id = 0;
606
607    if (strcasecmp(table_name, "basefiles") == 0) {
608       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
609    } else {
610       bstrncpy(sequence, table_name, sizeof(sequence));
611       bstrncat(sequence, "_",        sizeof(sequence));
612       bstrncat(sequence, table_name, sizeof(sequence));
613       bstrncat(sequence, "id",       sizeof(sequence));
614    }
615
616    bstrncat(sequence, "_seq", sizeof(sequence));
617    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
618
619    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
620    for (int i=0; i < 10; i++) {
621       result = PQexec(mdb->db, query);
622       if (result) {
623          break;
624       }
625       bmicrosleep(5, 0);
626    }
627    if (!result) {
628       Dmsg1(50, "Query failed: %s\n", query);
629       goto bail_out;
630    }
631
632    Dmsg0(500, "exec done");
633
634    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
635       Dmsg0(500, "getting value");
636       id = atoi(PQgetvalue(result, 0, 0));
637       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
638    } else {
639       Dmsg1(50, "Result status failed: %s\n", query);
640       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
641    }
642
643 bail_out:
644    PQclear(result);
645
646    return id;
647 }
648
649 #ifdef HAVE_BATCH_FILE_INSERT
650
651 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
652 {
653    const char *query = "COPY batch FROM STDIN";
654
655    Dmsg0(500, "my_postgresql_batch_start started\n");
656
657    if (my_postgresql_query(mdb,
658                            "CREATE TEMPORARY TABLE batch ("
659                                "fileindex int,"
660                                "jobid int,"
661                                "path varchar,"
662                                "name varchar,"
663                                "lstat varchar,"
664                                "md5 varchar)") == 1)
665    {
666       Dmsg0(500, "my_postgresql_batch_start failed\n");
667       return 1;
668    }
669    
670    // We are starting a new query.  reset everything.
671    mdb->num_rows     = -1;
672    mdb->row_number   = -1;
673    mdb->field_number = -1;
674
675    my_postgresql_free_result(mdb);
676
677    for (int i=0; i < 10; i++) {
678       mdb->result = PQexec(mdb->db, query);
679       if (mdb->result) {
680          break;
681       }
682       bmicrosleep(5, 0);
683    }
684    if (!mdb->result) {
685       Dmsg1(50, "Query failed: %s\n", query);
686       goto bail_out;
687    }
688
689    mdb->status = PQresultStatus(mdb->result);
690    if (mdb->status == PGRES_COPY_IN) {
691       // how many fields in the set?
692       mdb->num_fields = (int) PQnfields(mdb->result);
693       mdb->num_rows   = 0;
694       mdb->status = 1;
695    } else {
696       Dmsg1(50, "Result status failed: %s\n", query);
697       goto bail_out;
698    }
699
700    Dmsg0(500, "my_postgresql_batch_start finishing\n");
701
702    return mdb->status;
703
704 bail_out:
705    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
706    mdb->status = 0;
707    PQclear(mdb->result);
708    mdb->result = NULL;
709    return mdb->status;
710 }
711
712 /* set error to something to abort operation */
713 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
714 {
715    int res;
716    int count=30;
717    PGresult *result;
718    Dmsg0(500, "my_postgresql_batch_end started\n");
719
720    if (!mdb) {                  /* no files ? */
721       return 0;
722    }
723
724    do { 
725       res = PQputCopyEnd(mdb->db, error);
726    } while (res == 0 && --count > 0);
727
728    if (res == 1) {
729       Dmsg0(500, "ok\n");
730       mdb->status = 1;
731    }
732    
733    if (res <= 0) {
734       Dmsg0(500, "we failed\n");
735       mdb->status = 0;
736       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
737    }
738
739    /* Check command status and return to normal libpq state */
740    result = PQgetResult(mdb->db);
741    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
742       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
743       mdb->status = 0;
744    }
745    PQclear(result); 
746
747    Dmsg0(500, "my_postgresql_batch_end finishing\n");
748
749    return mdb->status;
750 }
751
752 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
753 {
754    int res;
755    int count=30;
756    size_t len;
757    const char *digest;
758    char ed1[50];
759
760    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
761    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
762
763    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
764    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
765
766    if (ar->Digest == NULL || ar->Digest[0] == 0) {
767       digest = "0";
768    } else {
769       digest = ar->Digest;
770    }
771
772    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
773               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
774               mdb->esc_name, ar->attr, digest);
775
776    do { 
777       res = PQputCopyData(mdb->db,
778                           mdb->cmd,
779                           len);
780    } while (res == 0 && --count > 0);
781
782    if (res == 1) {
783       Dmsg0(500, "ok\n");
784       mdb->changes++;
785       mdb->status = 1;
786    }
787
788    if (res <= 0) {
789       Dmsg0(500, "we failed\n");
790       mdb->status = 0;
791       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
792    }
793
794    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
795
796    return mdb->status;
797 }
798
799 #endif /* HAVE_BATCH_FILE_INSERT */
800
801 /*
802  * Escape strings so that PostgreSQL is happy on COPY
803  *
804  *   NOTE! len is the length of the old string. Your new
805  *         string must be long enough (max 2*old+1) to hold
806  *         the escaped output.
807  */
808 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
809 {
810    /* we have to escape \t, \n, \r, \ */
811    char c = '\0' ;
812
813    while (len > 0 && *src) {
814       switch (*src) {
815       case '\n':
816          c = 'n';
817          break;
818       case '\\':
819          c = '\\';
820          break;
821       case '\t':
822          c = 't';
823          break;
824       case '\r':
825          c = 'r';
826          break;
827       default:
828          c = '\0' ;
829       }
830
831       if (c) {
832          *dest = '\\';
833          dest++;
834          *dest = c;
835       } else {
836          *dest = *src;
837       }
838
839       len--;
840       src++;
841       dest++;
842    }
843
844    *dest = '\0';
845    return dest;
846 }
847
848 #ifdef HAVE_BATCH_FILE_INSERT
849 const char *my_pg_batch_lock_path_query = 
850    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
851
852
853 const char *my_pg_batch_lock_filename_query = 
854    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
855
856 const char *my_pg_batch_unlock_tables_query = "COMMIT";
857
858 const char *my_pg_batch_fill_path_query = 
859    "INSERT INTO Path (Path) "
860     "SELECT a.Path FROM "
861      "(SELECT DISTINCT Path FROM batch) AS a "
862       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
863
864
865 const char *my_pg_batch_fill_filename_query = 
866    "INSERT INTO Filename (Name) "
867     "SELECT a.Name FROM "
868      "(SELECT DISTINCT Name FROM batch) as a "
869       "WHERE NOT EXISTS "
870        "(SELECT Name FROM Filename WHERE Name = a.Name)";
871 #endif /* HAVE_BATCH_FILE_INSERT */
872
873 #endif /* HAVE_POSTGRESQL */