]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Fix #1335 about postgresql error message during copy session
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151       if (!ret) {
152          Mmsg(mdb->errmsg, 
153               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
154               mdb->db_name, row[0]);
155          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
156          Dmsg1(50, "%s", mdb->errmsg);
157       } 
158    }
159    return ret;
160 }
161
162 /*
163  * Now actually open the database.  This can generate errors,
164  *   which are returned in the errmsg
165  *
166  * DO NOT close the database or free(mdb) here !!!!
167  */
168 int
169 db_open_database(JCR *jcr, B_DB *mdb)
170 {
171    int errstat;
172    char buf[10], *port;
173
174 #ifdef xxx                      /* require libpq >= 8.2 */
175    if (!PQisthreadsafe()) {
176       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
177            "PostgreSQL library is not thread safe. Cannot continue.\n"));
178    }
179 #endif
180    P(mutex);
181    if (mdb->connected) {
182       V(mutex);
183       return 1;
184    }
185    mdb->connected = false;
186
187    if ((errstat=rwl_init(&mdb->lock)) != 0) {
188       berrno be;
189       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190             be.bstrerror(errstat));
191       V(mutex);
192       return 0;
193    }
194
195    if (mdb->db_port) {
196       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
197       port = buf;
198    } else {
199       port = NULL;
200    }
201
202    /* If connection fails, try at 5 sec intervals for 30 seconds. */
203    for (int retry=0; retry < 6; retry++) {
204       /* connect to the database */
205       mdb->db = PQsetdbLogin(
206            mdb->db_address,           /* default = localhost */
207            port,                      /* default port */
208            NULL,                      /* pg options */
209            NULL,                      /* tty, ignored */
210            mdb->db_name,              /* database name */
211            mdb->db_user,              /* login name */
212            mdb->db_password);         /* password */
213
214       /* If no connect, try once more in case it is a timing problem */
215       if (PQstatus(mdb->db) == CONNECTION_OK) {
216          break;
217       }
218       bmicrosleep(5, 0);
219    }
220
221    Dmsg0(50, "pg_real_connect done\n");
222    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223             mdb->db_password==NULL?"(NULL)":mdb->db_password);
224
225    if (PQstatus(mdb->db) != CONNECTION_OK) {
226       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
227             "Database=%s User=%s\n"
228             "It is probably not running or your password is incorrect.\n"),
229              mdb->db_name, mdb->db_user);
230       V(mutex);
231       return 0;
232    }
233
234    mdb->connected = true;
235
236    if (!check_tables_version(jcr, mdb)) {
237       V(mutex);
238       return 0;
239    }
240
241    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
242    
243    /* tell PostgreSQL we are using standard conforming strings
244       and avoid warnings such as:
245        WARNING:  nonstandard use of \\ in a string literal
246    */
247    sql_query(mdb, "set standard_conforming_strings=on");
248
249    /* check that encoding is SQL_ASCII */
250    check_database_encoding(jcr, mdb);
251
252    V(mutex);
253    return 1;
254 }
255
256 void
257 db_close_database(JCR *jcr, B_DB *mdb)
258 {
259    if (!mdb) {
260       return;
261    }
262    db_end_transaction(jcr, mdb);
263    P(mutex);
264    sql_free_result(mdb);
265    mdb->ref_count--;
266    if (mdb->ref_count == 0) {
267       qdchain(&mdb->bq);
268       if (mdb->connected && mdb->db) {
269          sql_close(mdb);
270       }
271       rwl_destroy(&mdb->lock);
272       free_pool_memory(mdb->errmsg);
273       free_pool_memory(mdb->cmd);
274       free_pool_memory(mdb->cached_path);
275       free_pool_memory(mdb->fname);
276       free_pool_memory(mdb->path);
277       free_pool_memory(mdb->esc_name);
278       free_pool_memory(mdb->esc_path);
279       if (mdb->db_name) {
280          free(mdb->db_name);
281       }
282       if (mdb->db_user) {
283          free(mdb->db_user);
284       }
285       if (mdb->db_password) {
286          free(mdb->db_password);
287       }
288       if (mdb->db_address) {
289          free(mdb->db_address);
290       }
291       if (mdb->db_socket) {
292          free(mdb->db_socket);
293       }
294       free(mdb);
295    }
296    V(mutex);
297 }
298
299 void db_thread_cleanup()
300 { }
301
302 /*
303  * Return the next unique index (auto-increment) for
304  * the given table.  Return NULL on error.
305  *
306  * For PostgreSQL, NULL causes the auto-increment value
307  *  to be updated.
308  */
309 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
310 {
311    strcpy(index, "NULL");
312    return 1;
313 }
314
315
316 /*
317  * Escape strings so that PostgreSQL is happy
318  *
319  *   NOTE! len is the length of the old string. Your new
320  *         string must be long enough (max 2*old+1) to hold
321  *         the escaped output.
322  */
323 void
324 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
325 {
326    int error;
327   
328    PQescapeStringConn(mdb->db, snew, old, len, &error);
329    if (error) {
330       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
331       /* error on encoding, probably invalid multibyte encoding in the source string
332         see PQescapeStringConn documentation for details. */
333       Dmsg0(500, "PQescapeStringConn failed\n");
334    }
335 }
336
337 /*
338  * Submit a general SQL command (cmd), and for each row returned,
339  *  the sqlite_handler is called with the ctx.
340  */
341 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
342 {
343    SQL_ROW row;
344
345    Dmsg0(500, "db_sql_query started\n");
346
347    db_lock(mdb);
348    if (sql_query(mdb, query) != 0) {
349       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
350       db_unlock(mdb);
351       Dmsg0(500, "db_sql_query failed\n");
352       return false;
353    }
354    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
355
356    if (result_handler != NULL) {
357       Dmsg0(500, "db_sql_query invoking handler\n");
358       if ((mdb->result = sql_store_result(mdb)) != NULL) {
359          int num_fields = sql_num_fields(mdb);
360
361          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
362          while ((row = sql_fetch_row(mdb)) != NULL) {
363
364             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
365             if (result_handler(ctx, num_fields, row))
366                break;
367          }
368
369         sql_free_result(mdb);
370       }
371    }
372    db_unlock(mdb);
373
374    Dmsg0(500, "db_sql_query finished\n");
375
376    return true;
377 }
378
379
380
381 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
382 {
383    int j;
384    POSTGRESQL_ROW row = NULL; // by default, return NULL
385
386    Dmsg0(500, "my_postgresql_fetch_row start\n");
387
388    if (!mdb->row || mdb->row_size < mdb->num_fields) {
389       int num_fields = mdb->num_fields;
390       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
391
392       if (mdb->row) {
393          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
394          free(mdb->row);
395       }
396       num_fields += 20;                  /* add a bit extra */
397       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
398       mdb->row_size = num_fields;
399
400       // now reset the row_number now that we have the space allocated
401       mdb->row_number = 0;
402    }
403
404    // if still within the result set
405    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
406       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
407       // get each value from this row
408       for (j = 0; j < mdb->num_fields; j++) {
409          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
410          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
411       }
412       // increment the row number for the next call
413       mdb->row_number++;
414
415       row = mdb->row;
416    } else {
417       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
418    }
419
420    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
421
422    return row;
423 }
424
425 int my_postgresql_max_length(B_DB *mdb, int field_num) {
426    //
427    // for a given column, find the max length
428    //
429    int max_length;
430    int i;
431    int this_length;
432
433    max_length = 0;
434    for (i = 0; i < mdb->num_rows; i++) {
435       if (PQgetisnull(mdb->result, i, field_num)) {
436           this_length = 4;        // "NULL"
437       } else {
438           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
439       }
440
441       if (max_length < this_length) {
442           max_length = this_length;
443       }
444    }
445
446    return max_length;
447 }
448
449 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
450 {
451    int     i;
452
453    Dmsg0(500, "my_postgresql_fetch_field starts\n");
454
455    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
456       if (mdb->fields) {
457          free(mdb->fields);
458       }
459       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
460       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
461       mdb->fields_size = mdb->num_fields;
462
463       for (i = 0; i < mdb->num_fields; i++) {
464          Dmsg1(500, "filling field %d\n", i);
465          mdb->fields[i].name           = PQfname(mdb->result, i);
466          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
467          mdb->fields[i].type       = PQftype(mdb->result, i);
468          mdb->fields[i].flags      = 0;
469
470          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
471             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
472             mdb->fields[i].flags);
473       } // end for
474    } // end if
475
476    // increment field number for the next time around
477
478    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
479    return &mdb->fields[mdb->field_number++];
480 }
481
482 void my_postgresql_data_seek(B_DB *mdb, int row)
483 {
484    // set the row number to be returned on the next call
485    // to my_postgresql_fetch_row
486    mdb->row_number = row;
487 }
488
489 void my_postgresql_field_seek(B_DB *mdb, int field)
490 {
491    mdb->field_number = field;
492 }
493
494 /*
495  * Note, if this routine returns 1 (failure), Bacula expects
496  *  that no result has been stored.
497  * This is where QUERY_DB comes with Postgresql.
498  *
499  *  Returns:  0  on success
500  *            1  on failure
501  *
502  */
503 int my_postgresql_query(B_DB *mdb, const char *query)
504 {
505    Dmsg0(500, "my_postgresql_query started\n");
506    // We are starting a new query.  reset everything.
507    mdb->num_rows     = -1;
508    mdb->row_number   = -1;
509    mdb->field_number = -1;
510
511    if (mdb->result) {
512       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
513       mdb->result = NULL;
514    }
515
516    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
517
518    for (int i=0; i < 10; i++) {
519       mdb->result = PQexec(mdb->db, query);
520       if (mdb->result) {
521          break;
522       }
523       bmicrosleep(5, 0);
524    }
525    if (!mdb->result) {
526       Dmsg1(50, "Query failed: %s\n", query);
527       goto bail_out;
528    }
529
530    mdb->status = PQresultStatus(mdb->result);
531    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
532       Dmsg1(500, "we have a result\n", query);
533
534       // how many fields in the set?
535       mdb->num_fields = (int)PQnfields(mdb->result);
536       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
537
538       mdb->num_rows = PQntuples(mdb->result);
539       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
540
541       mdb->row_number = 0;      /* we can start to fetch something */
542       mdb->status = 0;          /* succeed */
543    } else {
544       Dmsg1(50, "Result status failed: %s\n", query);
545       goto bail_out;
546    }
547
548    Dmsg0(500, "my_postgresql_query finishing\n");
549    return mdb->status;
550
551 bail_out:
552    Dmsg1(500, "we failed\n", query);
553    PQclear(mdb->result);
554    mdb->result = NULL;
555    mdb->status = 1;                   /* failed */
556    return mdb->status;
557 }
558
559 void my_postgresql_free_result(B_DB *mdb)
560 {
561    
562    db_lock(mdb);
563    if (mdb->result) {
564       PQclear(mdb->result);
565       mdb->result = NULL;
566    }
567
568    if (mdb->row) {
569       free(mdb->row);
570       mdb->row = NULL;
571    }
572
573    if (mdb->fields) {
574       free(mdb->fields);
575       mdb->fields = NULL;
576    }
577    db_unlock(mdb);
578 }
579
580 int my_postgresql_currval(B_DB *mdb, const char *table_name)
581 {
582    // Obtain the current value of the sequence that
583    // provides the serial value for primary key of the table.
584
585    // currval is local to our session.  It is not affected by
586    // other transactions.
587
588    // Determine the name of the sequence.
589    // PostgreSQL automatically creates a sequence using
590    // <table>_<column>_seq.
591    // At the time of writing, all tables used this format for
592    // for their primary key: <table>id
593    // Except for basefiles which has a primary key on baseid.
594    // Therefore, we need to special case that one table.
595
596    // everything else can use the PostgreSQL formula.
597
598    char      sequence[NAMEDATALEN-1];
599    char      query   [NAMEDATALEN+50];
600    PGresult *result;
601    int       id = 0;
602
603    if (strcasecmp(table_name, "basefiles") == 0) {
604       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
605    } else {
606       bstrncpy(sequence, table_name, sizeof(sequence));
607       bstrncat(sequence, "_",        sizeof(sequence));
608       bstrncat(sequence, table_name, sizeof(sequence));
609       bstrncat(sequence, "id",       sizeof(sequence));
610    }
611
612    bstrncat(sequence, "_seq", sizeof(sequence));
613    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
614
615    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
616    for (int i=0; i < 10; i++) {
617       result = PQexec(mdb->db, query);
618       if (result) {
619          break;
620       }
621       bmicrosleep(5, 0);
622    }
623    if (!result) {
624       Dmsg1(50, "Query failed: %s\n", query);
625       goto bail_out;
626    }
627
628    Dmsg0(500, "exec done");
629
630    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
631       Dmsg0(500, "getting value");
632       id = atoi(PQgetvalue(result, 0, 0));
633       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
634    } else {
635       Dmsg1(50, "Result status failed: %s\n", query);
636       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
637    }
638
639 bail_out:
640    PQclear(result);
641
642    return id;
643 }
644
645 #ifdef HAVE_BATCH_FILE_INSERT
646
647 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
648 {
649    const char *query = "COPY batch FROM STDIN";
650
651    Dmsg0(500, "my_postgresql_batch_start started\n");
652
653    if (my_postgresql_query(mdb,
654                            "CREATE TEMPORARY TABLE batch ("
655                                "fileindex int,"
656                                "jobid int,"
657                                "path varchar,"
658                                "name varchar,"
659                                "lstat varchar,"
660                                "md5 varchar)") == 1)
661    {
662       Dmsg0(500, "my_postgresql_batch_start failed\n");
663       return 1;
664    }
665    
666    // We are starting a new query.  reset everything.
667    mdb->num_rows     = -1;
668    mdb->row_number   = -1;
669    mdb->field_number = -1;
670
671    my_postgresql_free_result(mdb);
672
673    for (int i=0; i < 10; i++) {
674       mdb->result = PQexec(mdb->db, query);
675       if (mdb->result) {
676          break;
677       }
678       bmicrosleep(5, 0);
679    }
680    if (!mdb->result) {
681       Dmsg1(50, "Query failed: %s\n", query);
682       goto bail_out;
683    }
684
685    mdb->status = PQresultStatus(mdb->result);
686    if (mdb->status == PGRES_COPY_IN) {
687       // how many fields in the set?
688       mdb->num_fields = (int) PQnfields(mdb->result);
689       mdb->num_rows   = 0;
690       mdb->status = 1;
691    } else {
692       Dmsg1(50, "Result status failed: %s\n", query);
693       goto bail_out;
694    }
695
696    Dmsg0(500, "my_postgresql_batch_start finishing\n");
697
698    return mdb->status;
699
700 bail_out:
701    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
702    mdb->status = 0;
703    PQclear(mdb->result);
704    mdb->result = NULL;
705    return mdb->status;
706 }
707
708 /* set error to something to abort operation */
709 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
710 {
711    int res;
712    int count=30;
713    PGresult *result;
714    Dmsg0(500, "my_postgresql_batch_end started\n");
715
716    if (!mdb) {                  /* no files ? */
717       return 0;
718    }
719
720    do { 
721       res = PQputCopyEnd(mdb->db, error);
722    } while (res == 0 && --count > 0);
723
724    if (res == 1) {
725       Dmsg0(500, "ok\n");
726       mdb->status = 1;
727    }
728    
729    if (res <= 0) {
730       Dmsg0(500, "we failed\n");
731       mdb->status = 0;
732       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
733    }
734
735    /* Check command status and return to normal libpq state */
736    result = PQgetResult(mdb->db);
737    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
738       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
739       mdb->status = 0;
740    }
741    PQclear(result); 
742
743    Dmsg0(500, "my_postgresql_batch_end finishing\n");
744
745    return mdb->status;
746 }
747
748 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
749 {
750    int res;
751    int count=30;
752    size_t len;
753    const char *digest;
754    char ed1[50];
755
756    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
757    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
758
759    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
760    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
761
762    if (ar->Digest == NULL || ar->Digest[0] == 0) {
763       digest = "0";
764    } else {
765       digest = ar->Digest;
766    }
767
768    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
769               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
770               mdb->esc_name, ar->attr, digest);
771
772    do { 
773       res = PQputCopyData(mdb->db,
774                           mdb->cmd,
775                           len);
776    } while (res == 0 && --count > 0);
777
778    if (res == 1) {
779       Dmsg0(500, "ok\n");
780       mdb->changes++;
781       mdb->status = 1;
782    }
783
784    if (res <= 0) {
785       Dmsg0(500, "we failed\n");
786       mdb->status = 0;
787       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
788    }
789
790    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
791
792    return mdb->status;
793 }
794
795 #endif /* HAVE_BATCH_FILE_INSERT */
796
797 /*
798  * Escape strings so that PostgreSQL is happy on COPY
799  *
800  *   NOTE! len is the length of the old string. Your new
801  *         string must be long enough (max 2*old+1) to hold
802  *         the escaped output.
803  */
804 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
805 {
806    /* we have to escape \t, \n, \r, \ */
807    char c = '\0' ;
808
809    while (len > 0 && *src) {
810       switch (*src) {
811       case '\n':
812          c = 'n';
813          break;
814       case '\\':
815          c = '\\';
816          break;
817       case '\t':
818          c = 't';
819          break;
820       case '\r':
821          c = 'r';
822          break;
823       default:
824          c = '\0' ;
825       }
826
827       if (c) {
828          *dest = '\\';
829          dest++;
830          *dest = c;
831       } else {
832          *dest = *src;
833       }
834
835       len--;
836       src++;
837       dest++;
838    }
839
840    *dest = '\0';
841    return dest;
842 }
843
844 #ifdef HAVE_BATCH_FILE_INSERT
845 const char *my_pg_batch_lock_path_query = 
846    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
847
848
849 const char *my_pg_batch_lock_filename_query = 
850    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
851
852 const char *my_pg_batch_unlock_tables_query = "COMMIT";
853
854 const char *my_pg_batch_fill_path_query = 
855    "INSERT INTO Path (Path) "
856     "SELECT a.Path FROM "
857      "(SELECT DISTINCT Path FROM batch) AS a "
858       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
859
860
861 const char *my_pg_batch_fill_filename_query = 
862    "INSERT INTO Filename (Name) "
863     "SELECT a.Name FROM "
864      "(SELECT DISTINCT Name FROM batch) as a "
865       "WHERE NOT EXISTS "
866        "(SELECT Name FROM Filename WHERE Name = a.Name)";
867 #endif /* HAVE_BATCH_FILE_INSERT */
868
869 #endif /* HAVE_POSTGRESQL */