]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
f6919995cc11f3b3e23344515f8c48285d35bacc
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  */
36
37
38 /* The following is necessary so that we do not include
39  * the dummy external definition of DB.
40  */
41 #define __SQL_C                       /* indicate that this is sql.c */
42
43 #include "bacula.h"
44 #include "cats.h"
45
46 #ifdef HAVE_POSTGRESQL
47
48 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
49 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static dlist *db_list = NULL;
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (db_list == NULL) {
90       db_list = New(dlist(mdb, &mdb->link));
91    }
92    if (!mult_db_connections) {
93       /* Look to see if DB already open */
94       foreach_dlist(mdb, db_list) {
95          if (bstrcmp(mdb->db_name, db_name) &&
96              bstrcmp(mdb->db_address, db_address) &&
97              mdb->db_port == db_port) {
98             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
99             mdb->ref_count++;
100             V(mutex);
101             return mdb;                  /* already open */
102          }
103       }
104    }
105    Dmsg0(100, "db_open first time\n");
106    mdb = (B_DB *)malloc(sizeof(B_DB));
107    memset(mdb, 0, sizeof(B_DB));
108    mdb->db_name = bstrdup(db_name);
109    mdb->db_user = bstrdup(db_user);
110    if (db_password) {
111       mdb->db_password = bstrdup(db_password);
112    }
113    if (db_address) {
114       mdb->db_address  = bstrdup(db_address);
115    }
116    if (db_socket) {
117       mdb->db_socket   = bstrdup(db_socket);
118    }
119    mdb->db_port        = db_port;
120    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
121    *mdb->errmsg        = 0;
122    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
123    mdb->cached_path    = get_pool_memory(PM_FNAME);
124    mdb->cached_path_id = 0;
125    mdb->ref_count      = 1;
126    mdb->fname          = get_pool_memory(PM_FNAME);
127    mdb->path           = get_pool_memory(PM_FNAME);
128    mdb->esc_name       = get_pool_memory(PM_FNAME);
129    mdb->esc_path      = get_pool_memory(PM_FNAME);
130    mdb->allow_transactions = mult_db_connections;
131    db_list->append(mdb);                   /* put db in list */
132    V(mutex);
133    return mdb;
134 }
135
136 /* Check that the database correspond to the encoding we want */
137 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
138 {
139    SQL_ROW row;
140    int ret=false;
141
142    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
143       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
144       return false;
145    }
146
147    if ((row = sql_fetch_row(mdb)) == NULL) {
148       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
149       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
150    } else {
151       ret = bstrcmp(row[0], "SQL_ASCII");
152
153       if (ret) {
154          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
155          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
156
157       } else {                  /* something is wrong with database encoding */
158          Mmsg(mdb->errmsg, 
159               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
160               mdb->db_name, row[0]);
161          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
162          Dmsg1(50, "%s", mdb->errmsg);
163       } 
164    }
165    return ret;
166 }
167
168 /*
169  * Now actually open the database.  This can generate errors,
170  *   which are returned in the errmsg
171  *
172  * DO NOT close the database or free(mdb) here !!!!
173  */
174 int
175 db_open_database(JCR *jcr, B_DB *mdb)
176 {
177    int errstat;
178    char buf[10], *port;
179
180    P(mutex);
181    if (mdb->connected) {
182       V(mutex);
183       return 1;
184    }
185    mdb->connected = false;
186
187    if ((errstat=rwl_init(&mdb->lock)) != 0) {
188       berrno be;
189       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
190             be.bstrerror(errstat));
191       V(mutex);
192       return 0;
193    }
194
195    if (mdb->db_port) {
196       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
197       port = buf;
198    } else {
199       port = NULL;
200    }
201
202    /* If connection fails, try at 5 sec intervals for 30 seconds. */
203    for (int retry=0; retry < 6; retry++) {
204       /* connect to the database */
205       mdb->db = PQsetdbLogin(
206            mdb->db_address,           /* default = localhost */
207            port,                      /* default port */
208            NULL,                      /* pg options */
209            NULL,                      /* tty, ignored */
210            mdb->db_name,              /* database name */
211            mdb->db_user,              /* login name */
212            mdb->db_password);         /* password */
213
214       /* If no connect, try once more in case it is a timing problem */
215       if (PQstatus(mdb->db) == CONNECTION_OK) {
216          break;
217       }
218       bmicrosleep(5, 0);
219    }
220
221    Dmsg0(50, "pg_real_connect done\n");
222    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
223             mdb->db_password==NULL?"(NULL)":mdb->db_password);
224
225    if (PQstatus(mdb->db) != CONNECTION_OK) {
226       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
227          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
228          mdb->db_name, mdb->db_user);
229       V(mutex);
230       return 0;
231    }
232
233    mdb->connected = true;
234
235    if (!check_tables_version(jcr, mdb)) {
236       V(mutex);
237       return 0;
238    }
239
240    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
241    
242    /* tell PostgreSQL we are using standard conforming strings
243       and avoid warnings such as:
244        WARNING:  nonstandard use of \\ in a string literal
245    */
246    sql_query(mdb, "set standard_conforming_strings=on");
247
248    /* check that encoding is SQL_ASCII */
249    check_database_encoding(jcr, mdb);
250
251    V(mutex);
252    return 1;
253 }
254
255 void
256 db_close_database(JCR *jcr, B_DB *mdb)
257 {
258    if (!mdb) {
259       return;
260    }
261    db_end_transaction(jcr, mdb);
262    P(mutex);
263    sql_free_result(mdb);
264    mdb->ref_count--;
265    if (mdb->ref_count == 0) {
266       db_list->remove(mdb);
267       if (mdb->connected && mdb->db) {
268          sql_close(mdb);
269       }
270       rwl_destroy(&mdb->lock);
271       free_pool_memory(mdb->errmsg);
272       free_pool_memory(mdb->cmd);
273       free_pool_memory(mdb->cached_path);
274       free_pool_memory(mdb->fname);
275       free_pool_memory(mdb->path);
276       free_pool_memory(mdb->esc_name);
277       free_pool_memory(mdb->esc_path);
278       if (mdb->db_name) {
279          free(mdb->db_name);
280       }
281       if (mdb->db_user) {
282          free(mdb->db_user);
283       }
284       if (mdb->db_password) {
285          free(mdb->db_password);
286       }
287       if (mdb->db_address) {
288          free(mdb->db_address);
289       }
290       if (mdb->db_socket) {
291          free(mdb->db_socket);
292       }
293       free(mdb);
294       if (db_list->size() == 0) {
295          delete db_list;
296          db_list = NULL;
297       }
298    }
299    V(mutex);
300 }
301
302 void db_check_backend_thread_safe()
303 {
304 #ifdef HAVE_BATCH_FILE_INSERT
305 # ifdef HAVE_PQISTHREADSAFE 
306    if (!PQisthreadsafe()) {
307       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
308                           "when using BatchMode.\n"));
309    }
310 # endif
311 #endif
312 }
313
314 void db_thread_cleanup()
315 { }
316
317 /*
318  * Return the next unique index (auto-increment) for
319  * the given table.  Return NULL on error.
320  *
321  * For PostgreSQL, NULL causes the auto-increment value
322  *  to be updated.
323  */
324 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
325 {
326    strcpy(index, "NULL");
327    return 1;
328 }
329
330
331 /*
332  * Escape strings so that PostgreSQL is happy
333  *
334  *   NOTE! len is the length of the old string. Your new
335  *         string must be long enough (max 2*old+1) to hold
336  *         the escaped output.
337  */
338 void
339 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
340 {
341    int error;
342   
343    PQescapeStringConn(mdb->db, snew, old, len, &error);
344    if (error) {
345       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
346       /* error on encoding, probably invalid multibyte encoding in the source string
347         see PQescapeStringConn documentation for details. */
348       Dmsg0(500, "PQescapeStringConn failed\n");
349    }
350 }
351
352 /*
353  * Submit a general SQL command (cmd), and for each row returned,
354  *  the sqlite_handler is called with the ctx.
355  */
356 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
357 {
358    SQL_ROW row;
359
360    Dmsg0(500, "db_sql_query started\n");
361
362    db_lock(mdb);
363    if (sql_query(mdb, query) != 0) {
364       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
365       db_unlock(mdb);
366       Dmsg0(500, "db_sql_query failed\n");
367       return false;
368    }
369    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
370
371    if (result_handler != NULL) {
372       Dmsg0(500, "db_sql_query invoking handler\n");
373       if ((mdb->result = sql_store_result(mdb)) != NULL) {
374          int num_fields = sql_num_fields(mdb);
375
376          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
377          while ((row = sql_fetch_row(mdb)) != NULL) {
378
379             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
380             if (result_handler(ctx, num_fields, row))
381                break;
382          }
383
384         sql_free_result(mdb);
385       }
386    }
387    db_unlock(mdb);
388
389    Dmsg0(500, "db_sql_query finished\n");
390
391    return true;
392 }
393
394
395
396 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
397 {
398    int j;
399    POSTGRESQL_ROW row = NULL; // by default, return NULL
400
401    Dmsg0(500, "my_postgresql_fetch_row start\n");
402
403    if (!mdb->row || mdb->row_size < mdb->num_fields) {
404       int num_fields = mdb->num_fields;
405       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
406
407       if (mdb->row) {
408          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
409          free(mdb->row);
410       }
411       num_fields += 20;                  /* add a bit extra */
412       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
413       mdb->row_size = num_fields;
414
415       // now reset the row_number now that we have the space allocated
416       mdb->row_number = 0;
417    }
418
419    // if still within the result set
420    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
421       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
422       // get each value from this row
423       for (j = 0; j < mdb->num_fields; j++) {
424          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
425          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
426       }
427       // increment the row number for the next call
428       mdb->row_number++;
429
430       row = mdb->row;
431    } else {
432       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
433    }
434
435    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
436
437    return row;
438 }
439
440 int my_postgresql_max_length(B_DB *mdb, int field_num) {
441    //
442    // for a given column, find the max length
443    //
444    int max_length;
445    int i;
446    int this_length;
447
448    max_length = 0;
449    for (i = 0; i < mdb->num_rows; i++) {
450       if (PQgetisnull(mdb->result, i, field_num)) {
451           this_length = 4;        // "NULL"
452       } else {
453           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
454       }
455
456       if (max_length < this_length) {
457           max_length = this_length;
458       }
459    }
460
461    return max_length;
462 }
463
464 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
465 {
466    int     i;
467
468    Dmsg0(500, "my_postgresql_fetch_field starts\n");
469
470    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
471       if (mdb->fields) {
472          free(mdb->fields);
473       }
474       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
475       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
476       mdb->fields_size = mdb->num_fields;
477
478       for (i = 0; i < mdb->num_fields; i++) {
479          Dmsg1(500, "filling field %d\n", i);
480          mdb->fields[i].name           = PQfname(mdb->result, i);
481          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
482          mdb->fields[i].type       = PQftype(mdb->result, i);
483          mdb->fields[i].flags      = 0;
484
485          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
486             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
487             mdb->fields[i].flags);
488       } // end for
489    } // end if
490
491    // increment field number for the next time around
492
493    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
494    return &mdb->fields[mdb->field_number++];
495 }
496
497 void my_postgresql_data_seek(B_DB *mdb, int row)
498 {
499    // set the row number to be returned on the next call
500    // to my_postgresql_fetch_row
501    mdb->row_number = row;
502 }
503
504 void my_postgresql_field_seek(B_DB *mdb, int field)
505 {
506    mdb->field_number = field;
507 }
508
509 /*
510  * Note, if this routine returns 1 (failure), Bacula expects
511  *  that no result has been stored.
512  * This is where QUERY_DB comes with Postgresql.
513  *
514  *  Returns:  0  on success
515  *            1  on failure
516  *
517  */
518 int my_postgresql_query(B_DB *mdb, const char *query)
519 {
520    Dmsg0(500, "my_postgresql_query started\n");
521    // We are starting a new query.  reset everything.
522    mdb->num_rows     = -1;
523    mdb->row_number   = -1;
524    mdb->field_number = -1;
525
526    if (mdb->result) {
527       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
528       mdb->result = NULL;
529    }
530
531    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
532
533    for (int i=0; i < 10; i++) {
534       mdb->result = PQexec(mdb->db, query);
535       if (mdb->result) {
536          break;
537       }
538       bmicrosleep(5, 0);
539    }
540    if (!mdb->result) {
541       Dmsg1(50, "Query failed: %s\n", query);
542       goto bail_out;
543    }
544
545    mdb->status = PQresultStatus(mdb->result);
546    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
547       Dmsg1(500, "we have a result\n", query);
548
549       // how many fields in the set?
550       mdb->num_fields = (int)PQnfields(mdb->result);
551       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
552
553       mdb->num_rows = PQntuples(mdb->result);
554       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
555
556       mdb->row_number = 0;      /* we can start to fetch something */
557       mdb->status = 0;          /* succeed */
558    } else {
559       Dmsg1(50, "Result status failed: %s\n", query);
560       goto bail_out;
561    }
562
563    Dmsg0(500, "my_postgresql_query finishing\n");
564    return mdb->status;
565
566 bail_out:
567    Dmsg1(500, "we failed\n", query);
568    PQclear(mdb->result);
569    mdb->result = NULL;
570    mdb->status = 1;                   /* failed */
571    return mdb->status;
572 }
573
574 void my_postgresql_free_result(B_DB *mdb)
575 {
576    
577    db_lock(mdb);
578    if (mdb->result) {
579       PQclear(mdb->result);
580       mdb->result = NULL;
581    }
582
583    if (mdb->row) {
584       free(mdb->row);
585       mdb->row = NULL;
586    }
587
588    if (mdb->fields) {
589       free(mdb->fields);
590       mdb->fields = NULL;
591    }
592    db_unlock(mdb);
593 }
594
595 static int my_postgresql_currval(B_DB *mdb, const char *table_name)
596 {
597    // Obtain the current value of the sequence that
598    // provides the serial value for primary key of the table.
599
600    // currval is local to our session.  It is not affected by
601    // other transactions.
602
603    // Determine the name of the sequence.
604    // PostgreSQL automatically creates a sequence using
605    // <table>_<column>_seq.
606    // At the time of writing, all tables used this format for
607    // for their primary key: <table>id
608    // Except for basefiles which has a primary key on baseid.
609    // Therefore, we need to special case that one table.
610
611    // everything else can use the PostgreSQL formula.
612
613    char      sequence[NAMEDATALEN-1];
614    char      query   [NAMEDATALEN+50];
615    PGresult *result;
616    int       id = 0;
617
618    if (strcasecmp(table_name, "basefiles") == 0) {
619       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
620    } else {
621       bstrncpy(sequence, table_name, sizeof(sequence));
622       bstrncat(sequence, "_",        sizeof(sequence));
623       bstrncat(sequence, table_name, sizeof(sequence));
624       bstrncat(sequence, "id",       sizeof(sequence));
625    }
626
627    bstrncat(sequence, "_seq", sizeof(sequence));
628    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
629
630    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
631    for (int i=0; i < 10; i++) {
632       result = PQexec(mdb->db, query);
633       if (result) {
634          break;
635       }
636       bmicrosleep(5, 0);
637    }
638    if (!result) {
639       Dmsg1(50, "Query failed: %s\n", query);
640       goto bail_out;
641    }
642
643    Dmsg0(500, "exec done");
644
645    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
646       Dmsg0(500, "getting value");
647       id = atoi(PQgetvalue(result, 0, 0));
648       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
649    } else {
650       Dmsg1(50, "Result status failed: %s\n", query);
651       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
652    }
653
654 bail_out:
655    PQclear(result);
656
657    return id;
658 }
659
660 int my_postgresql_insert_autokey_record(B_DB *mdb, const char *query, const char *table_name)
661 {
662    /*
663     * First execute the insert query and then retrieve the currval.
664     */
665    if (my_postgresql_query(mdb, query)) {
666       return 0;
667    }
668
669    mdb->num_rows = sql_affected_rows(mdb);
670    if (mdb->num_rows != 1) {
671       return 0;
672    }
673
674    mdb->changes++;
675
676    return my_postgresql_currval(mdb, table_name);
677 }
678
679 #ifdef HAVE_BATCH_FILE_INSERT
680
681 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
682 {
683    const char *query = "COPY batch FROM STDIN";
684
685    Dmsg0(500, "my_postgresql_batch_start started\n");
686
687    if (my_postgresql_query(mdb,
688                            "CREATE TEMPORARY TABLE batch ("
689                                "fileindex int,"
690                                "jobid int,"
691                                "path varchar,"
692                                "name varchar,"
693                                "lstat varchar,"
694                                "md5 varchar)") == 1)
695    {
696       Dmsg0(500, "my_postgresql_batch_start failed\n");
697       return 1;
698    }
699    
700    // We are starting a new query.  reset everything.
701    mdb->num_rows     = -1;
702    mdb->row_number   = -1;
703    mdb->field_number = -1;
704
705    my_postgresql_free_result(mdb);
706
707    for (int i=0; i < 10; i++) {
708       mdb->result = PQexec(mdb->db, query);
709       if (mdb->result) {
710          break;
711       }
712       bmicrosleep(5, 0);
713    }
714    if (!mdb->result) {
715       Dmsg1(50, "Query failed: %s\n", query);
716       goto bail_out;
717    }
718
719    mdb->status = PQresultStatus(mdb->result);
720    if (mdb->status == PGRES_COPY_IN) {
721       // how many fields in the set?
722       mdb->num_fields = (int) PQnfields(mdb->result);
723       mdb->num_rows   = 0;
724       mdb->status = 1;
725    } else {
726       Dmsg1(50, "Result status failed: %s\n", query);
727       goto bail_out;
728    }
729
730    Dmsg0(500, "my_postgresql_batch_start finishing\n");
731
732    return mdb->status;
733
734 bail_out:
735    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
736    mdb->status = 0;
737    PQclear(mdb->result);
738    mdb->result = NULL;
739    return mdb->status;
740 }
741
742 /* set error to something to abort operation */
743 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
744 {
745    int res;
746    int count=30;
747    PGresult *result;
748    Dmsg0(500, "my_postgresql_batch_end started\n");
749
750    if (!mdb) {                  /* no files ? */
751       return 0;
752    }
753
754    do { 
755       res = PQputCopyEnd(mdb->db, error);
756    } while (res == 0 && --count > 0);
757
758    if (res == 1) {
759       Dmsg0(500, "ok\n");
760       mdb->status = 1;
761    }
762    
763    if (res <= 0) {
764       Dmsg0(500, "we failed\n");
765       mdb->status = 0;
766       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
767    }
768
769    /* Check command status and return to normal libpq state */
770    result = PQgetResult(mdb->db);
771    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
772       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
773       mdb->status = 0;
774    }
775    PQclear(result); 
776
777    Dmsg0(500, "my_postgresql_batch_end finishing\n");
778
779    return mdb->status;
780 }
781
782 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
783 {
784    int res;
785    int count=30;
786    size_t len;
787    const char *digest;
788    char ed1[50];
789
790    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
791    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
792
793    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
794    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
795
796    if (ar->Digest == NULL || ar->Digest[0] == 0) {
797       digest = "0";
798    } else {
799       digest = ar->Digest;
800    }
801
802    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
803               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
804               mdb->esc_name, ar->attr, digest);
805
806    do { 
807       res = PQputCopyData(mdb->db,
808                           mdb->cmd,
809                           len);
810    } while (res == 0 && --count > 0);
811
812    if (res == 1) {
813       Dmsg0(500, "ok\n");
814       mdb->changes++;
815       mdb->status = 1;
816    }
817
818    if (res <= 0) {
819       Dmsg0(500, "we failed\n");
820       mdb->status = 0;
821       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
822    }
823
824    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
825
826    return mdb->status;
827 }
828
829 #endif /* HAVE_BATCH_FILE_INSERT */
830
831 /*
832  * Escape strings so that PostgreSQL is happy on COPY
833  *
834  *   NOTE! len is the length of the old string. Your new
835  *         string must be long enough (max 2*old+1) to hold
836  *         the escaped output.
837  */
838 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
839 {
840    /* we have to escape \t, \n, \r, \ */
841    char c = '\0' ;
842
843    while (len > 0 && *src) {
844       switch (*src) {
845       case '\n':
846          c = 'n';
847          break;
848       case '\\':
849          c = '\\';
850          break;
851       case '\t':
852          c = 't';
853          break;
854       case '\r':
855          c = 'r';
856          break;
857       default:
858          c = '\0' ;
859       }
860
861       if (c) {
862          *dest = '\\';
863          dest++;
864          *dest = c;
865       } else {
866          *dest = *src;
867       }
868
869       len--;
870       src++;
871       dest++;
872    }
873
874    *dest = '\0';
875    return dest;
876 }
877
878 #ifdef HAVE_BATCH_FILE_INSERT
879 const char *my_pg_batch_lock_path_query = 
880    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
881
882
883 const char *my_pg_batch_lock_filename_query = 
884    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
885
886 const char *my_pg_batch_unlock_tables_query = "COMMIT";
887
888 const char *my_pg_batch_fill_path_query = 
889    "INSERT INTO Path (Path) "
890     "SELECT a.Path FROM "
891      "(SELECT DISTINCT Path FROM batch) AS a "
892       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
893
894
895 const char *my_pg_batch_fill_filename_query = 
896    "INSERT INTO Filename (Name) "
897     "SELECT a.Name FROM "
898      "(SELECT DISTINCT Name FROM batch) as a "
899       "WHERE NOT EXISTS "
900        "(SELECT Name FROM Filename WHERE Name = a.Name)";
901 #endif /* HAVE_BATCH_FILE_INSERT */
902
903 #endif /* HAVE_POSTGRESQL */