]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
M_ABORT Bacula If batch insert is turned on when we try to open a connection and...
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151
152       if (ret) {
153          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
154          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
155
156       } else {                  /* something is wrong with database encoding */
157          Mmsg(mdb->errmsg, 
158               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
159               mdb->db_name, row[0]);
160          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
161          Dmsg1(50, "%s", mdb->errmsg);
162       } 
163    }
164    return ret;
165 }
166
167 /*
168  * Now actually open the database.  This can generate errors,
169  *   which are returned in the errmsg
170  *
171  * DO NOT close the database or free(mdb) here !!!!
172  */
173 int
174 db_open_database(JCR *jcr, B_DB *mdb)
175 {
176    int errstat;
177    char buf[10], *port;
178
179 #ifdef xxx                      /* require libpq >= 8.2 */
180    if (!PQisthreadsafe()) {
181       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
182            "PostgreSQL library is not thread safe. Cannot continue.\n"));
183    }
184 #endif
185    P(mutex);
186    if (mdb->connected) {
187       V(mutex);
188       return 1;
189    }
190    mdb->connected = false;
191
192    if ((errstat=rwl_init(&mdb->lock)) != 0) {
193       berrno be;
194       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
195             be.bstrerror(errstat));
196       V(mutex);
197       return 0;
198    }
199
200    if (mdb->db_port) {
201       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202       port = buf;
203    } else {
204       port = NULL;
205    }
206
207    /* If connection fails, try at 5 sec intervals for 30 seconds. */
208    for (int retry=0; retry < 6; retry++) {
209       /* connect to the database */
210       mdb->db = PQsetdbLogin(
211            mdb->db_address,           /* default = localhost */
212            port,                      /* default port */
213            NULL,                      /* pg options */
214            NULL,                      /* tty, ignored */
215            mdb->db_name,              /* database name */
216            mdb->db_user,              /* login name */
217            mdb->db_password);         /* password */
218
219       /* If no connect, try once more in case it is a timing problem */
220       if (PQstatus(mdb->db) == CONNECTION_OK) {
221          break;
222       }
223       bmicrosleep(5, 0);
224    }
225
226    Dmsg0(50, "pg_real_connect done\n");
227    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
228             mdb->db_password==NULL?"(NULL)":mdb->db_password);
229
230    if (PQstatus(mdb->db) != CONNECTION_OK) {
231       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
232          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
233          mdb->db_name, mdb->db_user);
234       V(mutex);
235       return 0;
236    }
237
238    mdb->connected = true;
239
240    if (!check_tables_version(jcr, mdb)) {
241       V(mutex);
242       return 0;
243    }
244
245    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
246    
247    /* tell PostgreSQL we are using standard conforming strings
248       and avoid warnings such as:
249        WARNING:  nonstandard use of \\ in a string literal
250    */
251    sql_query(mdb, "set standard_conforming_strings=on");
252
253    /* check that encoding is SQL_ASCII */
254    check_database_encoding(jcr, mdb);
255
256    V(mutex);
257    return 1;
258 }
259
260 void
261 db_close_database(JCR *jcr, B_DB *mdb)
262 {
263    if (!mdb) {
264       return;
265    }
266    db_end_transaction(jcr, mdb);
267    P(mutex);
268    sql_free_result(mdb);
269    mdb->ref_count--;
270    if (mdb->ref_count == 0) {
271       qdchain(&mdb->bq);
272       if (mdb->connected && mdb->db) {
273          sql_close(mdb);
274       }
275       rwl_destroy(&mdb->lock);
276       free_pool_memory(mdb->errmsg);
277       free_pool_memory(mdb->cmd);
278       free_pool_memory(mdb->cached_path);
279       free_pool_memory(mdb->fname);
280       free_pool_memory(mdb->path);
281       free_pool_memory(mdb->esc_name);
282       free_pool_memory(mdb->esc_path);
283       if (mdb->db_name) {
284          free(mdb->db_name);
285       }
286       if (mdb->db_user) {
287          free(mdb->db_user);
288       }
289       if (mdb->db_password) {
290          free(mdb->db_password);
291       }
292       if (mdb->db_address) {
293          free(mdb->db_address);
294       }
295       if (mdb->db_socket) {
296          free(mdb->db_socket);
297       }
298       free(mdb);
299    }
300    V(mutex);
301 }
302
303 void db_check_backend_thread_safe()
304 {
305 #ifdef HAVE_BATCH_FILE_INSERT
306    if (!PQisthreadsafe()) {
307       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
308                           "when using BatchMode.\n"));
309    }
310 #endif
311 }
312
313 void db_thread_cleanup()
314 { }
315
316 /*
317  * Return the next unique index (auto-increment) for
318  * the given table.  Return NULL on error.
319  *
320  * For PostgreSQL, NULL causes the auto-increment value
321  *  to be updated.
322  */
323 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
324 {
325    strcpy(index, "NULL");
326    return 1;
327 }
328
329
330 /*
331  * Escape strings so that PostgreSQL is happy
332  *
333  *   NOTE! len is the length of the old string. Your new
334  *         string must be long enough (max 2*old+1) to hold
335  *         the escaped output.
336  */
337 void
338 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
339 {
340    int error;
341   
342    PQescapeStringConn(mdb->db, snew, old, len, &error);
343    if (error) {
344       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
345       /* error on encoding, probably invalid multibyte encoding in the source string
346         see PQescapeStringConn documentation for details. */
347       Dmsg0(500, "PQescapeStringConn failed\n");
348    }
349 }
350
351 /*
352  * Submit a general SQL command (cmd), and for each row returned,
353  *  the sqlite_handler is called with the ctx.
354  */
355 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
356 {
357    SQL_ROW row;
358
359    Dmsg0(500, "db_sql_query started\n");
360
361    db_lock(mdb);
362    if (sql_query(mdb, query) != 0) {
363       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
364       db_unlock(mdb);
365       Dmsg0(500, "db_sql_query failed\n");
366       return false;
367    }
368    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
369
370    if (result_handler != NULL) {
371       Dmsg0(500, "db_sql_query invoking handler\n");
372       if ((mdb->result = sql_store_result(mdb)) != NULL) {
373          int num_fields = sql_num_fields(mdb);
374
375          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
376          while ((row = sql_fetch_row(mdb)) != NULL) {
377
378             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
379             if (result_handler(ctx, num_fields, row))
380                break;
381          }
382
383         sql_free_result(mdb);
384       }
385    }
386    db_unlock(mdb);
387
388    Dmsg0(500, "db_sql_query finished\n");
389
390    return true;
391 }
392
393
394
395 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
396 {
397    int j;
398    POSTGRESQL_ROW row = NULL; // by default, return NULL
399
400    Dmsg0(500, "my_postgresql_fetch_row start\n");
401
402    if (!mdb->row || mdb->row_size < mdb->num_fields) {
403       int num_fields = mdb->num_fields;
404       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
405
406       if (mdb->row) {
407          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
408          free(mdb->row);
409       }
410       num_fields += 20;                  /* add a bit extra */
411       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
412       mdb->row_size = num_fields;
413
414       // now reset the row_number now that we have the space allocated
415       mdb->row_number = 0;
416    }
417
418    // if still within the result set
419    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
420       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
421       // get each value from this row
422       for (j = 0; j < mdb->num_fields; j++) {
423          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
424          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
425       }
426       // increment the row number for the next call
427       mdb->row_number++;
428
429       row = mdb->row;
430    } else {
431       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
432    }
433
434    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
435
436    return row;
437 }
438
439 int my_postgresql_max_length(B_DB *mdb, int field_num) {
440    //
441    // for a given column, find the max length
442    //
443    int max_length;
444    int i;
445    int this_length;
446
447    max_length = 0;
448    for (i = 0; i < mdb->num_rows; i++) {
449       if (PQgetisnull(mdb->result, i, field_num)) {
450           this_length = 4;        // "NULL"
451       } else {
452           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
453       }
454
455       if (max_length < this_length) {
456           max_length = this_length;
457       }
458    }
459
460    return max_length;
461 }
462
463 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
464 {
465    int     i;
466
467    Dmsg0(500, "my_postgresql_fetch_field starts\n");
468
469    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
470       if (mdb->fields) {
471          free(mdb->fields);
472       }
473       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
474       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
475       mdb->fields_size = mdb->num_fields;
476
477       for (i = 0; i < mdb->num_fields; i++) {
478          Dmsg1(500, "filling field %d\n", i);
479          mdb->fields[i].name           = PQfname(mdb->result, i);
480          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
481          mdb->fields[i].type       = PQftype(mdb->result, i);
482          mdb->fields[i].flags      = 0;
483
484          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
485             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
486             mdb->fields[i].flags);
487       } // end for
488    } // end if
489
490    // increment field number for the next time around
491
492    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
493    return &mdb->fields[mdb->field_number++];
494 }
495
496 void my_postgresql_data_seek(B_DB *mdb, int row)
497 {
498    // set the row number to be returned on the next call
499    // to my_postgresql_fetch_row
500    mdb->row_number = row;
501 }
502
503 void my_postgresql_field_seek(B_DB *mdb, int field)
504 {
505    mdb->field_number = field;
506 }
507
508 /*
509  * Note, if this routine returns 1 (failure), Bacula expects
510  *  that no result has been stored.
511  * This is where QUERY_DB comes with Postgresql.
512  *
513  *  Returns:  0  on success
514  *            1  on failure
515  *
516  */
517 int my_postgresql_query(B_DB *mdb, const char *query)
518 {
519    Dmsg0(500, "my_postgresql_query started\n");
520    // We are starting a new query.  reset everything.
521    mdb->num_rows     = -1;
522    mdb->row_number   = -1;
523    mdb->field_number = -1;
524
525    if (mdb->result) {
526       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
527       mdb->result = NULL;
528    }
529
530    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
531
532    for (int i=0; i < 10; i++) {
533       mdb->result = PQexec(mdb->db, query);
534       if (mdb->result) {
535          break;
536       }
537       bmicrosleep(5, 0);
538    }
539    if (!mdb->result) {
540       Dmsg1(50, "Query failed: %s\n", query);
541       goto bail_out;
542    }
543
544    mdb->status = PQresultStatus(mdb->result);
545    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
546       Dmsg1(500, "we have a result\n", query);
547
548       // how many fields in the set?
549       mdb->num_fields = (int)PQnfields(mdb->result);
550       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
551
552       mdb->num_rows = PQntuples(mdb->result);
553       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
554
555       mdb->row_number = 0;      /* we can start to fetch something */
556       mdb->status = 0;          /* succeed */
557    } else {
558       Dmsg1(50, "Result status failed: %s\n", query);
559       goto bail_out;
560    }
561
562    Dmsg0(500, "my_postgresql_query finishing\n");
563    return mdb->status;
564
565 bail_out:
566    Dmsg1(500, "we failed\n", query);
567    PQclear(mdb->result);
568    mdb->result = NULL;
569    mdb->status = 1;                   /* failed */
570    return mdb->status;
571 }
572
573 void my_postgresql_free_result(B_DB *mdb)
574 {
575    
576    db_lock(mdb);
577    if (mdb->result) {
578       PQclear(mdb->result);
579       mdb->result = NULL;
580    }
581
582    if (mdb->row) {
583       free(mdb->row);
584       mdb->row = NULL;
585    }
586
587    if (mdb->fields) {
588       free(mdb->fields);
589       mdb->fields = NULL;
590    }
591    db_unlock(mdb);
592 }
593
594 int my_postgresql_currval(B_DB *mdb, const char *table_name)
595 {
596    // Obtain the current value of the sequence that
597    // provides the serial value for primary key of the table.
598
599    // currval is local to our session.  It is not affected by
600    // other transactions.
601
602    // Determine the name of the sequence.
603    // PostgreSQL automatically creates a sequence using
604    // <table>_<column>_seq.
605    // At the time of writing, all tables used this format for
606    // for their primary key: <table>id
607    // Except for basefiles which has a primary key on baseid.
608    // Therefore, we need to special case that one table.
609
610    // everything else can use the PostgreSQL formula.
611
612    char      sequence[NAMEDATALEN-1];
613    char      query   [NAMEDATALEN+50];
614    PGresult *result;
615    int       id = 0;
616
617    if (strcasecmp(table_name, "basefiles") == 0) {
618       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
619    } else {
620       bstrncpy(sequence, table_name, sizeof(sequence));
621       bstrncat(sequence, "_",        sizeof(sequence));
622       bstrncat(sequence, table_name, sizeof(sequence));
623       bstrncat(sequence, "id",       sizeof(sequence));
624    }
625
626    bstrncat(sequence, "_seq", sizeof(sequence));
627    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
628
629    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
630    for (int i=0; i < 10; i++) {
631       result = PQexec(mdb->db, query);
632       if (result) {
633          break;
634       }
635       bmicrosleep(5, 0);
636    }
637    if (!result) {
638       Dmsg1(50, "Query failed: %s\n", query);
639       goto bail_out;
640    }
641
642    Dmsg0(500, "exec done");
643
644    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
645       Dmsg0(500, "getting value");
646       id = atoi(PQgetvalue(result, 0, 0));
647       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
648    } else {
649       Dmsg1(50, "Result status failed: %s\n", query);
650       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
651    }
652
653 bail_out:
654    PQclear(result);
655
656    return id;
657 }
658
659 #ifdef HAVE_BATCH_FILE_INSERT
660
661 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
662 {
663    const char *query = "COPY batch FROM STDIN";
664
665    Dmsg0(500, "my_postgresql_batch_start started\n");
666
667    if (my_postgresql_query(mdb,
668                            "CREATE TEMPORARY TABLE batch ("
669                                "fileindex int,"
670                                "jobid int,"
671                                "path varchar,"
672                                "name varchar,"
673                                "lstat varchar,"
674                                "md5 varchar)") == 1)
675    {
676       Dmsg0(500, "my_postgresql_batch_start failed\n");
677       return 1;
678    }
679    
680    // We are starting a new query.  reset everything.
681    mdb->num_rows     = -1;
682    mdb->row_number   = -1;
683    mdb->field_number = -1;
684
685    my_postgresql_free_result(mdb);
686
687    for (int i=0; i < 10; i++) {
688       mdb->result = PQexec(mdb->db, query);
689       if (mdb->result) {
690          break;
691       }
692       bmicrosleep(5, 0);
693    }
694    if (!mdb->result) {
695       Dmsg1(50, "Query failed: %s\n", query);
696       goto bail_out;
697    }
698
699    mdb->status = PQresultStatus(mdb->result);
700    if (mdb->status == PGRES_COPY_IN) {
701       // how many fields in the set?
702       mdb->num_fields = (int) PQnfields(mdb->result);
703       mdb->num_rows   = 0;
704       mdb->status = 1;
705    } else {
706       Dmsg1(50, "Result status failed: %s\n", query);
707       goto bail_out;
708    }
709
710    Dmsg0(500, "my_postgresql_batch_start finishing\n");
711
712    return mdb->status;
713
714 bail_out:
715    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
716    mdb->status = 0;
717    PQclear(mdb->result);
718    mdb->result = NULL;
719    return mdb->status;
720 }
721
722 /* set error to something to abort operation */
723 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
724 {
725    int res;
726    int count=30;
727    PGresult *result;
728    Dmsg0(500, "my_postgresql_batch_end started\n");
729
730    if (!mdb) {                  /* no files ? */
731       return 0;
732    }
733
734    do { 
735       res = PQputCopyEnd(mdb->db, error);
736    } while (res == 0 && --count > 0);
737
738    if (res == 1) {
739       Dmsg0(500, "ok\n");
740       mdb->status = 1;
741    }
742    
743    if (res <= 0) {
744       Dmsg0(500, "we failed\n");
745       mdb->status = 0;
746       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
747    }
748
749    /* Check command status and return to normal libpq state */
750    result = PQgetResult(mdb->db);
751    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
752       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
753       mdb->status = 0;
754    }
755    PQclear(result); 
756
757    Dmsg0(500, "my_postgresql_batch_end finishing\n");
758
759    return mdb->status;
760 }
761
762 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
763 {
764    int res;
765    int count=30;
766    size_t len;
767    const char *digest;
768    char ed1[50];
769
770    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
771    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
772
773    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
774    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
775
776    if (ar->Digest == NULL || ar->Digest[0] == 0) {
777       digest = "0";
778    } else {
779       digest = ar->Digest;
780    }
781
782    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
783               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
784               mdb->esc_name, ar->attr, digest);
785
786    do { 
787       res = PQputCopyData(mdb->db,
788                           mdb->cmd,
789                           len);
790    } while (res == 0 && --count > 0);
791
792    if (res == 1) {
793       Dmsg0(500, "ok\n");
794       mdb->changes++;
795       mdb->status = 1;
796    }
797
798    if (res <= 0) {
799       Dmsg0(500, "we failed\n");
800       mdb->status = 0;
801       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
802    }
803
804    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
805
806    return mdb->status;
807 }
808
809 #endif /* HAVE_BATCH_FILE_INSERT */
810
811 /*
812  * Escape strings so that PostgreSQL is happy on COPY
813  *
814  *   NOTE! len is the length of the old string. Your new
815  *         string must be long enough (max 2*old+1) to hold
816  *         the escaped output.
817  */
818 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
819 {
820    /* we have to escape \t, \n, \r, \ */
821    char c = '\0' ;
822
823    while (len > 0 && *src) {
824       switch (*src) {
825       case '\n':
826          c = 'n';
827          break;
828       case '\\':
829          c = '\\';
830          break;
831       case '\t':
832          c = 't';
833          break;
834       case '\r':
835          c = 'r';
836          break;
837       default:
838          c = '\0' ;
839       }
840
841       if (c) {
842          *dest = '\\';
843          dest++;
844          *dest = c;
845       } else {
846          *dest = *src;
847       }
848
849       len--;
850       src++;
851       dest++;
852    }
853
854    *dest = '\0';
855    return dest;
856 }
857
858 #ifdef HAVE_BATCH_FILE_INSERT
859 const char *my_pg_batch_lock_path_query = 
860    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
861
862
863 const char *my_pg_batch_lock_filename_query = 
864    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
865
866 const char *my_pg_batch_unlock_tables_query = "COMMIT";
867
868 const char *my_pg_batch_fill_path_query = 
869    "INSERT INTO Path (Path) "
870     "SELECT a.Path FROM "
871      "(SELECT DISTINCT Path FROM batch) AS a "
872       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
873
874
875 const char *my_pg_batch_fill_filename_query = 
876    "INSERT INTO Filename (Name) "
877     "SELECT a.Name FROM "
878      "(SELECT DISTINCT Name FROM batch) as a "
879       "WHERE NOT EXISTS "
880        "(SELECT Name FROM Filename WHERE Name = a.Name)";
881 #endif /* HAVE_BATCH_FILE_INSERT */
882
883 #endif /* HAVE_POSTGRESQL */