]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Drop have_insert_id in mdb as its always true for all backends.
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /* Check that the database correspond to the encoding we want */
135 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
136 {
137    SQL_ROW row;
138    int ret=false;
139
140    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
141       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
142       return false;
143    }
144
145    if ((row = sql_fetch_row(mdb)) == NULL) {
146       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
147       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
148    } else {
149       ret = bstrcmp(row[0], "SQL_ASCII");
150
151       if (ret) {
152          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
153          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
154
155       } else {                  /* something is wrong with database encoding */
156          Mmsg(mdb->errmsg, 
157               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
158               mdb->db_name, row[0]);
159          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
160          Dmsg1(50, "%s", mdb->errmsg);
161       } 
162    }
163    return ret;
164 }
165
166 /*
167  * Now actually open the database.  This can generate errors,
168  *   which are returned in the errmsg
169  *
170  * DO NOT close the database or free(mdb) here !!!!
171  */
172 int
173 db_open_database(JCR *jcr, B_DB *mdb)
174 {
175    int errstat;
176    char buf[10], *port;
177
178    P(mutex);
179    if (mdb->connected) {
180       V(mutex);
181       return 1;
182    }
183    mdb->connected = false;
184
185    if ((errstat=rwl_init(&mdb->lock)) != 0) {
186       berrno be;
187       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
188             be.bstrerror(errstat));
189       V(mutex);
190       return 0;
191    }
192
193    if (mdb->db_port) {
194       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
195       port = buf;
196    } else {
197       port = NULL;
198    }
199
200    /* If connection fails, try at 5 sec intervals for 30 seconds. */
201    for (int retry=0; retry < 6; retry++) {
202       /* connect to the database */
203       mdb->db = PQsetdbLogin(
204            mdb->db_address,           /* default = localhost */
205            port,                      /* default port */
206            NULL,                      /* pg options */
207            NULL,                      /* tty, ignored */
208            mdb->db_name,              /* database name */
209            mdb->db_user,              /* login name */
210            mdb->db_password);         /* password */
211
212       /* If no connect, try once more in case it is a timing problem */
213       if (PQstatus(mdb->db) == CONNECTION_OK) {
214          break;
215       }
216       bmicrosleep(5, 0);
217    }
218
219    Dmsg0(50, "pg_real_connect done\n");
220    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
221             mdb->db_password==NULL?"(NULL)":mdb->db_password);
222
223    if (PQstatus(mdb->db) != CONNECTION_OK) {
224       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
225          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
226          mdb->db_name, mdb->db_user);
227       V(mutex);
228       return 0;
229    }
230
231    mdb->connected = true;
232
233    if (!check_tables_version(jcr, mdb)) {
234       V(mutex);
235       return 0;
236    }
237
238    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
239    
240    /* tell PostgreSQL we are using standard conforming strings
241       and avoid warnings such as:
242        WARNING:  nonstandard use of \\ in a string literal
243    */
244    sql_query(mdb, "set standard_conforming_strings=on");
245
246    /* check that encoding is SQL_ASCII */
247    check_database_encoding(jcr, mdb);
248
249    V(mutex);
250    return 1;
251 }
252
253 void
254 db_close_database(JCR *jcr, B_DB *mdb)
255 {
256    if (!mdb) {
257       return;
258    }
259    db_end_transaction(jcr, mdb);
260    P(mutex);
261    sql_free_result(mdb);
262    mdb->ref_count--;
263    if (mdb->ref_count == 0) {
264       qdchain(&mdb->bq);
265       if (mdb->connected && mdb->db) {
266          sql_close(mdb);
267       }
268       rwl_destroy(&mdb->lock);
269       free_pool_memory(mdb->errmsg);
270       free_pool_memory(mdb->cmd);
271       free_pool_memory(mdb->cached_path);
272       free_pool_memory(mdb->fname);
273       free_pool_memory(mdb->path);
274       free_pool_memory(mdb->esc_name);
275       free_pool_memory(mdb->esc_path);
276       if (mdb->db_name) {
277          free(mdb->db_name);
278       }
279       if (mdb->db_user) {
280          free(mdb->db_user);
281       }
282       if (mdb->db_password) {
283          free(mdb->db_password);
284       }
285       if (mdb->db_address) {
286          free(mdb->db_address);
287       }
288       if (mdb->db_socket) {
289          free(mdb->db_socket);
290       }
291       free(mdb);
292    }
293    V(mutex);
294 }
295
296 void db_check_backend_thread_safe()
297 {
298 #ifdef HAVE_BATCH_FILE_INSERT
299 # ifdef HAVE_PQISTHREADSAFE 
300    if (!PQisthreadsafe()) {
301       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
302                           "when using BatchMode.\n"));
303    }
304 # endif
305 #endif
306 }
307
308 void db_thread_cleanup()
309 { }
310
311 /*
312  * Return the next unique index (auto-increment) for
313  * the given table.  Return NULL on error.
314  *
315  * For PostgreSQL, NULL causes the auto-increment value
316  *  to be updated.
317  */
318 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
319 {
320    strcpy(index, "NULL");
321    return 1;
322 }
323
324
325 /*
326  * Escape strings so that PostgreSQL is happy
327  *
328  *   NOTE! len is the length of the old string. Your new
329  *         string must be long enough (max 2*old+1) to hold
330  *         the escaped output.
331  */
332 void
333 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
334 {
335    int error;
336   
337    PQescapeStringConn(mdb->db, snew, old, len, &error);
338    if (error) {
339       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
340       /* error on encoding, probably invalid multibyte encoding in the source string
341         see PQescapeStringConn documentation for details. */
342       Dmsg0(500, "PQescapeStringConn failed\n");
343    }
344 }
345
346 /*
347  * Submit a general SQL command (cmd), and for each row returned,
348  *  the sqlite_handler is called with the ctx.
349  */
350 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
351 {
352    SQL_ROW row;
353
354    Dmsg0(500, "db_sql_query started\n");
355
356    db_lock(mdb);
357    if (sql_query(mdb, query) != 0) {
358       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
359       db_unlock(mdb);
360       Dmsg0(500, "db_sql_query failed\n");
361       return false;
362    }
363    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
364
365    if (result_handler != NULL) {
366       Dmsg0(500, "db_sql_query invoking handler\n");
367       if ((mdb->result = sql_store_result(mdb)) != NULL) {
368          int num_fields = sql_num_fields(mdb);
369
370          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
371          while ((row = sql_fetch_row(mdb)) != NULL) {
372
373             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
374             if (result_handler(ctx, num_fields, row))
375                break;
376          }
377
378         sql_free_result(mdb);
379       }
380    }
381    db_unlock(mdb);
382
383    Dmsg0(500, "db_sql_query finished\n");
384
385    return true;
386 }
387
388
389
390 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
391 {
392    int j;
393    POSTGRESQL_ROW row = NULL; // by default, return NULL
394
395    Dmsg0(500, "my_postgresql_fetch_row start\n");
396
397    if (!mdb->row || mdb->row_size < mdb->num_fields) {
398       int num_fields = mdb->num_fields;
399       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
400
401       if (mdb->row) {
402          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
403          free(mdb->row);
404       }
405       num_fields += 20;                  /* add a bit extra */
406       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
407       mdb->row_size = num_fields;
408
409       // now reset the row_number now that we have the space allocated
410       mdb->row_number = 0;
411    }
412
413    // if still within the result set
414    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
415       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
416       // get each value from this row
417       for (j = 0; j < mdb->num_fields; j++) {
418          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
419          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
420       }
421       // increment the row number for the next call
422       mdb->row_number++;
423
424       row = mdb->row;
425    } else {
426       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
427    }
428
429    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
430
431    return row;
432 }
433
434 int my_postgresql_max_length(B_DB *mdb, int field_num) {
435    //
436    // for a given column, find the max length
437    //
438    int max_length;
439    int i;
440    int this_length;
441
442    max_length = 0;
443    for (i = 0; i < mdb->num_rows; i++) {
444       if (PQgetisnull(mdb->result, i, field_num)) {
445           this_length = 4;        // "NULL"
446       } else {
447           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
448       }
449
450       if (max_length < this_length) {
451           max_length = this_length;
452       }
453    }
454
455    return max_length;
456 }
457
458 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
459 {
460    int     i;
461
462    Dmsg0(500, "my_postgresql_fetch_field starts\n");
463
464    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
465       if (mdb->fields) {
466          free(mdb->fields);
467       }
468       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
469       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
470       mdb->fields_size = mdb->num_fields;
471
472       for (i = 0; i < mdb->num_fields; i++) {
473          Dmsg1(500, "filling field %d\n", i);
474          mdb->fields[i].name           = PQfname(mdb->result, i);
475          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
476          mdb->fields[i].type       = PQftype(mdb->result, i);
477          mdb->fields[i].flags      = 0;
478
479          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
480             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
481             mdb->fields[i].flags);
482       } // end for
483    } // end if
484
485    // increment field number for the next time around
486
487    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
488    return &mdb->fields[mdb->field_number++];
489 }
490
491 void my_postgresql_data_seek(B_DB *mdb, int row)
492 {
493    // set the row number to be returned on the next call
494    // to my_postgresql_fetch_row
495    mdb->row_number = row;
496 }
497
498 void my_postgresql_field_seek(B_DB *mdb, int field)
499 {
500    mdb->field_number = field;
501 }
502
503 /*
504  * Note, if this routine returns 1 (failure), Bacula expects
505  *  that no result has been stored.
506  * This is where QUERY_DB comes with Postgresql.
507  *
508  *  Returns:  0  on success
509  *            1  on failure
510  *
511  */
512 int my_postgresql_query(B_DB *mdb, const char *query)
513 {
514    Dmsg0(500, "my_postgresql_query started\n");
515    // We are starting a new query.  reset everything.
516    mdb->num_rows     = -1;
517    mdb->row_number   = -1;
518    mdb->field_number = -1;
519
520    if (mdb->result) {
521       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
522       mdb->result = NULL;
523    }
524
525    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
526
527    for (int i=0; i < 10; i++) {
528       mdb->result = PQexec(mdb->db, query);
529       if (mdb->result) {
530          break;
531       }
532       bmicrosleep(5, 0);
533    }
534    if (!mdb->result) {
535       Dmsg1(50, "Query failed: %s\n", query);
536       goto bail_out;
537    }
538
539    mdb->status = PQresultStatus(mdb->result);
540    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
541       Dmsg1(500, "we have a result\n", query);
542
543       // how many fields in the set?
544       mdb->num_fields = (int)PQnfields(mdb->result);
545       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
546
547       mdb->num_rows = PQntuples(mdb->result);
548       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
549
550       mdb->row_number = 0;      /* we can start to fetch something */
551       mdb->status = 0;          /* succeed */
552    } else {
553       Dmsg1(50, "Result status failed: %s\n", query);
554       goto bail_out;
555    }
556
557    Dmsg0(500, "my_postgresql_query finishing\n");
558    return mdb->status;
559
560 bail_out:
561    Dmsg1(500, "we failed\n", query);
562    PQclear(mdb->result);
563    mdb->result = NULL;
564    mdb->status = 1;                   /* failed */
565    return mdb->status;
566 }
567
568 void my_postgresql_free_result(B_DB *mdb)
569 {
570    
571    db_lock(mdb);
572    if (mdb->result) {
573       PQclear(mdb->result);
574       mdb->result = NULL;
575    }
576
577    if (mdb->row) {
578       free(mdb->row);
579       mdb->row = NULL;
580    }
581
582    if (mdb->fields) {
583       free(mdb->fields);
584       mdb->fields = NULL;
585    }
586    db_unlock(mdb);
587 }
588
589 int my_postgresql_currval(B_DB *mdb, const char *table_name)
590 {
591    // Obtain the current value of the sequence that
592    // provides the serial value for primary key of the table.
593
594    // currval is local to our session.  It is not affected by
595    // other transactions.
596
597    // Determine the name of the sequence.
598    // PostgreSQL automatically creates a sequence using
599    // <table>_<column>_seq.
600    // At the time of writing, all tables used this format for
601    // for their primary key: <table>id
602    // Except for basefiles which has a primary key on baseid.
603    // Therefore, we need to special case that one table.
604
605    // everything else can use the PostgreSQL formula.
606
607    char      sequence[NAMEDATALEN-1];
608    char      query   [NAMEDATALEN+50];
609    PGresult *result;
610    int       id = 0;
611
612    if (strcasecmp(table_name, "basefiles") == 0) {
613       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
614    } else {
615       bstrncpy(sequence, table_name, sizeof(sequence));
616       bstrncat(sequence, "_",        sizeof(sequence));
617       bstrncat(sequence, table_name, sizeof(sequence));
618       bstrncat(sequence, "id",       sizeof(sequence));
619    }
620
621    bstrncat(sequence, "_seq", sizeof(sequence));
622    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
623
624    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
625    for (int i=0; i < 10; i++) {
626       result = PQexec(mdb->db, query);
627       if (result) {
628          break;
629       }
630       bmicrosleep(5, 0);
631    }
632    if (!result) {
633       Dmsg1(50, "Query failed: %s\n", query);
634       goto bail_out;
635    }
636
637    Dmsg0(500, "exec done");
638
639    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
640       Dmsg0(500, "getting value");
641       id = atoi(PQgetvalue(result, 0, 0));
642       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
643    } else {
644       Dmsg1(50, "Result status failed: %s\n", query);
645       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
646    }
647
648 bail_out:
649    PQclear(result);
650
651    return id;
652 }
653
654 #ifdef HAVE_BATCH_FILE_INSERT
655
656 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
657 {
658    const char *query = "COPY batch FROM STDIN";
659
660    Dmsg0(500, "my_postgresql_batch_start started\n");
661
662    if (my_postgresql_query(mdb,
663                            "CREATE TEMPORARY TABLE batch ("
664                                "fileindex int,"
665                                "jobid int,"
666                                "path varchar,"
667                                "name varchar,"
668                                "lstat varchar,"
669                                "md5 varchar)") == 1)
670    {
671       Dmsg0(500, "my_postgresql_batch_start failed\n");
672       return 1;
673    }
674    
675    // We are starting a new query.  reset everything.
676    mdb->num_rows     = -1;
677    mdb->row_number   = -1;
678    mdb->field_number = -1;
679
680    my_postgresql_free_result(mdb);
681
682    for (int i=0; i < 10; i++) {
683       mdb->result = PQexec(mdb->db, query);
684       if (mdb->result) {
685          break;
686       }
687       bmicrosleep(5, 0);
688    }
689    if (!mdb->result) {
690       Dmsg1(50, "Query failed: %s\n", query);
691       goto bail_out;
692    }
693
694    mdb->status = PQresultStatus(mdb->result);
695    if (mdb->status == PGRES_COPY_IN) {
696       // how many fields in the set?
697       mdb->num_fields = (int) PQnfields(mdb->result);
698       mdb->num_rows   = 0;
699       mdb->status = 1;
700    } else {
701       Dmsg1(50, "Result status failed: %s\n", query);
702       goto bail_out;
703    }
704
705    Dmsg0(500, "my_postgresql_batch_start finishing\n");
706
707    return mdb->status;
708
709 bail_out:
710    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
711    mdb->status = 0;
712    PQclear(mdb->result);
713    mdb->result = NULL;
714    return mdb->status;
715 }
716
717 /* set error to something to abort operation */
718 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
719 {
720    int res;
721    int count=30;
722    PGresult *result;
723    Dmsg0(500, "my_postgresql_batch_end started\n");
724
725    if (!mdb) {                  /* no files ? */
726       return 0;
727    }
728
729    do { 
730       res = PQputCopyEnd(mdb->db, error);
731    } while (res == 0 && --count > 0);
732
733    if (res == 1) {
734       Dmsg0(500, "ok\n");
735       mdb->status = 1;
736    }
737    
738    if (res <= 0) {
739       Dmsg0(500, "we failed\n");
740       mdb->status = 0;
741       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
742    }
743
744    /* Check command status and return to normal libpq state */
745    result = PQgetResult(mdb->db);
746    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
747       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
748       mdb->status = 0;
749    }
750    PQclear(result); 
751
752    Dmsg0(500, "my_postgresql_batch_end finishing\n");
753
754    return mdb->status;
755 }
756
757 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
758 {
759    int res;
760    int count=30;
761    size_t len;
762    const char *digest;
763    char ed1[50];
764
765    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
766    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
767
768    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
769    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
770
771    if (ar->Digest == NULL || ar->Digest[0] == 0) {
772       digest = "0";
773    } else {
774       digest = ar->Digest;
775    }
776
777    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
778               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
779               mdb->esc_name, ar->attr, digest);
780
781    do { 
782       res = PQputCopyData(mdb->db,
783                           mdb->cmd,
784                           len);
785    } while (res == 0 && --count > 0);
786
787    if (res == 1) {
788       Dmsg0(500, "ok\n");
789       mdb->changes++;
790       mdb->status = 1;
791    }
792
793    if (res <= 0) {
794       Dmsg0(500, "we failed\n");
795       mdb->status = 0;
796       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
797    }
798
799    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
800
801    return mdb->status;
802 }
803
804 #endif /* HAVE_BATCH_FILE_INSERT */
805
806 /*
807  * Escape strings so that PostgreSQL is happy on COPY
808  *
809  *   NOTE! len is the length of the old string. Your new
810  *         string must be long enough (max 2*old+1) to hold
811  *         the escaped output.
812  */
813 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
814 {
815    /* we have to escape \t, \n, \r, \ */
816    char c = '\0' ;
817
818    while (len > 0 && *src) {
819       switch (*src) {
820       case '\n':
821          c = 'n';
822          break;
823       case '\\':
824          c = '\\';
825          break;
826       case '\t':
827          c = 't';
828          break;
829       case '\r':
830          c = 'r';
831          break;
832       default:
833          c = '\0' ;
834       }
835
836       if (c) {
837          *dest = '\\';
838          dest++;
839          *dest = c;
840       } else {
841          *dest = *src;
842       }
843
844       len--;
845       src++;
846       dest++;
847    }
848
849    *dest = '\0';
850    return dest;
851 }
852
853 #ifdef HAVE_BATCH_FILE_INSERT
854 const char *my_pg_batch_lock_path_query = 
855    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
856
857
858 const char *my_pg_batch_lock_filename_query = 
859    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
860
861 const char *my_pg_batch_unlock_tables_query = "COMMIT";
862
863 const char *my_pg_batch_fill_path_query = 
864    "INSERT INTO Path (Path) "
865     "SELECT a.Path FROM "
866      "(SELECT DISTINCT Path FROM batch) AS a "
867       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
868
869
870 const char *my_pg_batch_fill_filename_query = 
871    "INSERT INTO Filename (Name) "
872     "SELECT a.Name FROM "
873      "(SELECT DISTINCT Name FROM batch) as a "
874       "WHERE NOT EXISTS "
875        "(SELECT Name FROM Filename WHERE Name = a.Name)";
876 #endif /* HAVE_BATCH_FILE_INSERT */
877
878 #endif /* HAVE_POSTGRESQL */