]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Fix segfault when loading Plugins
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151
152       if (ret) {
153          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
154          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
155
156       } else {                  /* something is wrong with database encoding */
157          Mmsg(mdb->errmsg, 
158               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
159               mdb->db_name, row[0]);
160          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
161          Dmsg1(50, "%s", mdb->errmsg);
162       } 
163    }
164    return ret;
165 }
166
167 /*
168  * Now actually open the database.  This can generate errors,
169  *   which are returned in the errmsg
170  *
171  * DO NOT close the database or free(mdb) here !!!!
172  */
173 int
174 db_open_database(JCR *jcr, B_DB *mdb)
175 {
176    int errstat;
177    char buf[10], *port;
178
179    P(mutex);
180    if (mdb->connected) {
181       V(mutex);
182       return 1;
183    }
184    mdb->connected = false;
185
186    if ((errstat=rwl_init(&mdb->lock)) != 0) {
187       berrno be;
188       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
189             be.bstrerror(errstat));
190       V(mutex);
191       return 0;
192    }
193
194    if (mdb->db_port) {
195       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
196       port = buf;
197    } else {
198       port = NULL;
199    }
200
201    /* If connection fails, try at 5 sec intervals for 30 seconds. */
202    for (int retry=0; retry < 6; retry++) {
203       /* connect to the database */
204       mdb->db = PQsetdbLogin(
205            mdb->db_address,           /* default = localhost */
206            port,                      /* default port */
207            NULL,                      /* pg options */
208            NULL,                      /* tty, ignored */
209            mdb->db_name,              /* database name */
210            mdb->db_user,              /* login name */
211            mdb->db_password);         /* password */
212
213       /* If no connect, try once more in case it is a timing problem */
214       if (PQstatus(mdb->db) == CONNECTION_OK) {
215          break;
216       }
217       bmicrosleep(5, 0);
218    }
219
220    Dmsg0(50, "pg_real_connect done\n");
221    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
222             mdb->db_password==NULL?"(NULL)":mdb->db_password);
223
224    if (PQstatus(mdb->db) != CONNECTION_OK) {
225       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server. Database=%s User=%s\n"
226          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
227          mdb->db_name, mdb->db_user);
228       V(mutex);
229       return 0;
230    }
231
232    mdb->connected = true;
233
234    if (!check_tables_version(jcr, mdb)) {
235       V(mutex);
236       return 0;
237    }
238
239    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
240    
241    /* tell PostgreSQL we are using standard conforming strings
242       and avoid warnings such as:
243        WARNING:  nonstandard use of \\ in a string literal
244    */
245    sql_query(mdb, "set standard_conforming_strings=on");
246
247    /* check that encoding is SQL_ASCII */
248    check_database_encoding(jcr, mdb);
249
250    V(mutex);
251    return 1;
252 }
253
254 void
255 db_close_database(JCR *jcr, B_DB *mdb)
256 {
257    if (!mdb) {
258       return;
259    }
260    db_end_transaction(jcr, mdb);
261    P(mutex);
262    sql_free_result(mdb);
263    mdb->ref_count--;
264    if (mdb->ref_count == 0) {
265       qdchain(&mdb->bq);
266       if (mdb->connected && mdb->db) {
267          sql_close(mdb);
268       }
269       rwl_destroy(&mdb->lock);
270       free_pool_memory(mdb->errmsg);
271       free_pool_memory(mdb->cmd);
272       free_pool_memory(mdb->cached_path);
273       free_pool_memory(mdb->fname);
274       free_pool_memory(mdb->path);
275       free_pool_memory(mdb->esc_name);
276       free_pool_memory(mdb->esc_path);
277       if (mdb->db_name) {
278          free(mdb->db_name);
279       }
280       if (mdb->db_user) {
281          free(mdb->db_user);
282       }
283       if (mdb->db_password) {
284          free(mdb->db_password);
285       }
286       if (mdb->db_address) {
287          free(mdb->db_address);
288       }
289       if (mdb->db_socket) {
290          free(mdb->db_socket);
291       }
292       free(mdb);
293    }
294    V(mutex);
295 }
296
297 void db_check_backend_thread_safe()
298 {
299 #ifdef HAVE_BATCH_FILE_INSERT
300 # ifdef HAVE_PQISTHREADSAFE 
301    if (!PQisthreadsafe()) {
302       Emsg0(M_ABORT, 0, _("Pg client library must be thread-safe "
303                           "when using BatchMode.\n"));
304    }
305 # endif
306 #endif
307 }
308
309 void db_thread_cleanup()
310 { }
311
312 /*
313  * Return the next unique index (auto-increment) for
314  * the given table.  Return NULL on error.
315  *
316  * For PostgreSQL, NULL causes the auto-increment value
317  *  to be updated.
318  */
319 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
320 {
321    strcpy(index, "NULL");
322    return 1;
323 }
324
325
326 /*
327  * Escape strings so that PostgreSQL is happy
328  *
329  *   NOTE! len is the length of the old string. Your new
330  *         string must be long enough (max 2*old+1) to hold
331  *         the escaped output.
332  */
333 void
334 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
335 {
336    int error;
337   
338    PQescapeStringConn(mdb->db, snew, old, len, &error);
339    if (error) {
340       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
341       /* error on encoding, probably invalid multibyte encoding in the source string
342         see PQescapeStringConn documentation for details. */
343       Dmsg0(500, "PQescapeStringConn failed\n");
344    }
345 }
346
347 /*
348  * Submit a general SQL command (cmd), and for each row returned,
349  *  the sqlite_handler is called with the ctx.
350  */
351 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
352 {
353    SQL_ROW row;
354
355    Dmsg0(500, "db_sql_query started\n");
356
357    db_lock(mdb);
358    if (sql_query(mdb, query) != 0) {
359       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
360       db_unlock(mdb);
361       Dmsg0(500, "db_sql_query failed\n");
362       return false;
363    }
364    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
365
366    if (result_handler != NULL) {
367       Dmsg0(500, "db_sql_query invoking handler\n");
368       if ((mdb->result = sql_store_result(mdb)) != NULL) {
369          int num_fields = sql_num_fields(mdb);
370
371          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
372          while ((row = sql_fetch_row(mdb)) != NULL) {
373
374             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
375             if (result_handler(ctx, num_fields, row))
376                break;
377          }
378
379         sql_free_result(mdb);
380       }
381    }
382    db_unlock(mdb);
383
384    Dmsg0(500, "db_sql_query finished\n");
385
386    return true;
387 }
388
389
390
391 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
392 {
393    int j;
394    POSTGRESQL_ROW row = NULL; // by default, return NULL
395
396    Dmsg0(500, "my_postgresql_fetch_row start\n");
397
398    if (!mdb->row || mdb->row_size < mdb->num_fields) {
399       int num_fields = mdb->num_fields;
400       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
401
402       if (mdb->row) {
403          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
404          free(mdb->row);
405       }
406       num_fields += 20;                  /* add a bit extra */
407       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
408       mdb->row_size = num_fields;
409
410       // now reset the row_number now that we have the space allocated
411       mdb->row_number = 0;
412    }
413
414    // if still within the result set
415    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
416       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
417       // get each value from this row
418       for (j = 0; j < mdb->num_fields; j++) {
419          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
420          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
421       }
422       // increment the row number for the next call
423       mdb->row_number++;
424
425       row = mdb->row;
426    } else {
427       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
428    }
429
430    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
431
432    return row;
433 }
434
435 int my_postgresql_max_length(B_DB *mdb, int field_num) {
436    //
437    // for a given column, find the max length
438    //
439    int max_length;
440    int i;
441    int this_length;
442
443    max_length = 0;
444    for (i = 0; i < mdb->num_rows; i++) {
445       if (PQgetisnull(mdb->result, i, field_num)) {
446           this_length = 4;        // "NULL"
447       } else {
448           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
449       }
450
451       if (max_length < this_length) {
452           max_length = this_length;
453       }
454    }
455
456    return max_length;
457 }
458
459 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
460 {
461    int     i;
462
463    Dmsg0(500, "my_postgresql_fetch_field starts\n");
464
465    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
466       if (mdb->fields) {
467          free(mdb->fields);
468       }
469       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
470       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
471       mdb->fields_size = mdb->num_fields;
472
473       for (i = 0; i < mdb->num_fields; i++) {
474          Dmsg1(500, "filling field %d\n", i);
475          mdb->fields[i].name           = PQfname(mdb->result, i);
476          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
477          mdb->fields[i].type       = PQftype(mdb->result, i);
478          mdb->fields[i].flags      = 0;
479
480          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
481             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
482             mdb->fields[i].flags);
483       } // end for
484    } // end if
485
486    // increment field number for the next time around
487
488    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
489    return &mdb->fields[mdb->field_number++];
490 }
491
492 void my_postgresql_data_seek(B_DB *mdb, int row)
493 {
494    // set the row number to be returned on the next call
495    // to my_postgresql_fetch_row
496    mdb->row_number = row;
497 }
498
499 void my_postgresql_field_seek(B_DB *mdb, int field)
500 {
501    mdb->field_number = field;
502 }
503
504 /*
505  * Note, if this routine returns 1 (failure), Bacula expects
506  *  that no result has been stored.
507  * This is where QUERY_DB comes with Postgresql.
508  *
509  *  Returns:  0  on success
510  *            1  on failure
511  *
512  */
513 int my_postgresql_query(B_DB *mdb, const char *query)
514 {
515    Dmsg0(500, "my_postgresql_query started\n");
516    // We are starting a new query.  reset everything.
517    mdb->num_rows     = -1;
518    mdb->row_number   = -1;
519    mdb->field_number = -1;
520
521    if (mdb->result) {
522       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
523       mdb->result = NULL;
524    }
525
526    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
527
528    for (int i=0; i < 10; i++) {
529       mdb->result = PQexec(mdb->db, query);
530       if (mdb->result) {
531          break;
532       }
533       bmicrosleep(5, 0);
534    }
535    if (!mdb->result) {
536       Dmsg1(50, "Query failed: %s\n", query);
537       goto bail_out;
538    }
539
540    mdb->status = PQresultStatus(mdb->result);
541    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
542       Dmsg1(500, "we have a result\n", query);
543
544       // how many fields in the set?
545       mdb->num_fields = (int)PQnfields(mdb->result);
546       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
547
548       mdb->num_rows = PQntuples(mdb->result);
549       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
550
551       mdb->row_number = 0;      /* we can start to fetch something */
552       mdb->status = 0;          /* succeed */
553    } else {
554       Dmsg1(50, "Result status failed: %s\n", query);
555       goto bail_out;
556    }
557
558    Dmsg0(500, "my_postgresql_query finishing\n");
559    return mdb->status;
560
561 bail_out:
562    Dmsg1(500, "we failed\n", query);
563    PQclear(mdb->result);
564    mdb->result = NULL;
565    mdb->status = 1;                   /* failed */
566    return mdb->status;
567 }
568
569 void my_postgresql_free_result(B_DB *mdb)
570 {
571    
572    db_lock(mdb);
573    if (mdb->result) {
574       PQclear(mdb->result);
575       mdb->result = NULL;
576    }
577
578    if (mdb->row) {
579       free(mdb->row);
580       mdb->row = NULL;
581    }
582
583    if (mdb->fields) {
584       free(mdb->fields);
585       mdb->fields = NULL;
586    }
587    db_unlock(mdb);
588 }
589
590 int my_postgresql_currval(B_DB *mdb, const char *table_name)
591 {
592    // Obtain the current value of the sequence that
593    // provides the serial value for primary key of the table.
594
595    // currval is local to our session.  It is not affected by
596    // other transactions.
597
598    // Determine the name of the sequence.
599    // PostgreSQL automatically creates a sequence using
600    // <table>_<column>_seq.
601    // At the time of writing, all tables used this format for
602    // for their primary key: <table>id
603    // Except for basefiles which has a primary key on baseid.
604    // Therefore, we need to special case that one table.
605
606    // everything else can use the PostgreSQL formula.
607
608    char      sequence[NAMEDATALEN-1];
609    char      query   [NAMEDATALEN+50];
610    PGresult *result;
611    int       id = 0;
612
613    if (strcasecmp(table_name, "basefiles") == 0) {
614       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
615    } else {
616       bstrncpy(sequence, table_name, sizeof(sequence));
617       bstrncat(sequence, "_",        sizeof(sequence));
618       bstrncat(sequence, table_name, sizeof(sequence));
619       bstrncat(sequence, "id",       sizeof(sequence));
620    }
621
622    bstrncat(sequence, "_seq", sizeof(sequence));
623    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
624
625    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
626    for (int i=0; i < 10; i++) {
627       result = PQexec(mdb->db, query);
628       if (result) {
629          break;
630       }
631       bmicrosleep(5, 0);
632    }
633    if (!result) {
634       Dmsg1(50, "Query failed: %s\n", query);
635       goto bail_out;
636    }
637
638    Dmsg0(500, "exec done");
639
640    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
641       Dmsg0(500, "getting value");
642       id = atoi(PQgetvalue(result, 0, 0));
643       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
644    } else {
645       Dmsg1(50, "Result status failed: %s\n", query);
646       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
647    }
648
649 bail_out:
650    PQclear(result);
651
652    return id;
653 }
654
655 #ifdef HAVE_BATCH_FILE_INSERT
656
657 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
658 {
659    const char *query = "COPY batch FROM STDIN";
660
661    Dmsg0(500, "my_postgresql_batch_start started\n");
662
663    if (my_postgresql_query(mdb,
664                            "CREATE TEMPORARY TABLE batch ("
665                                "fileindex int,"
666                                "jobid int,"
667                                "path varchar,"
668                                "name varchar,"
669                                "lstat varchar,"
670                                "md5 varchar)") == 1)
671    {
672       Dmsg0(500, "my_postgresql_batch_start failed\n");
673       return 1;
674    }
675    
676    // We are starting a new query.  reset everything.
677    mdb->num_rows     = -1;
678    mdb->row_number   = -1;
679    mdb->field_number = -1;
680
681    my_postgresql_free_result(mdb);
682
683    for (int i=0; i < 10; i++) {
684       mdb->result = PQexec(mdb->db, query);
685       if (mdb->result) {
686          break;
687       }
688       bmicrosleep(5, 0);
689    }
690    if (!mdb->result) {
691       Dmsg1(50, "Query failed: %s\n", query);
692       goto bail_out;
693    }
694
695    mdb->status = PQresultStatus(mdb->result);
696    if (mdb->status == PGRES_COPY_IN) {
697       // how many fields in the set?
698       mdb->num_fields = (int) PQnfields(mdb->result);
699       mdb->num_rows   = 0;
700       mdb->status = 1;
701    } else {
702       Dmsg1(50, "Result status failed: %s\n", query);
703       goto bail_out;
704    }
705
706    Dmsg0(500, "my_postgresql_batch_start finishing\n");
707
708    return mdb->status;
709
710 bail_out:
711    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
712    mdb->status = 0;
713    PQclear(mdb->result);
714    mdb->result = NULL;
715    return mdb->status;
716 }
717
718 /* set error to something to abort operation */
719 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
720 {
721    int res;
722    int count=30;
723    PGresult *result;
724    Dmsg0(500, "my_postgresql_batch_end started\n");
725
726    if (!mdb) {                  /* no files ? */
727       return 0;
728    }
729
730    do { 
731       res = PQputCopyEnd(mdb->db, error);
732    } while (res == 0 && --count > 0);
733
734    if (res == 1) {
735       Dmsg0(500, "ok\n");
736       mdb->status = 1;
737    }
738    
739    if (res <= 0) {
740       Dmsg0(500, "we failed\n");
741       mdb->status = 0;
742       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
743    }
744
745    /* Check command status and return to normal libpq state */
746    result = PQgetResult(mdb->db);
747    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
748       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
749       mdb->status = 0;
750    }
751    PQclear(result); 
752
753    Dmsg0(500, "my_postgresql_batch_end finishing\n");
754
755    return mdb->status;
756 }
757
758 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
759 {
760    int res;
761    int count=30;
762    size_t len;
763    const char *digest;
764    char ed1[50];
765
766    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
767    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
768
769    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
770    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
771
772    if (ar->Digest == NULL || ar->Digest[0] == 0) {
773       digest = "0";
774    } else {
775       digest = ar->Digest;
776    }
777
778    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
779               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
780               mdb->esc_name, ar->attr, digest);
781
782    do { 
783       res = PQputCopyData(mdb->db,
784                           mdb->cmd,
785                           len);
786    } while (res == 0 && --count > 0);
787
788    if (res == 1) {
789       Dmsg0(500, "ok\n");
790       mdb->changes++;
791       mdb->status = 1;
792    }
793
794    if (res <= 0) {
795       Dmsg0(500, "we failed\n");
796       mdb->status = 0;
797       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
798    }
799
800    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
801
802    return mdb->status;
803 }
804
805 #endif /* HAVE_BATCH_FILE_INSERT */
806
807 /*
808  * Escape strings so that PostgreSQL is happy on COPY
809  *
810  *   NOTE! len is the length of the old string. Your new
811  *         string must be long enough (max 2*old+1) to hold
812  *         the escaped output.
813  */
814 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
815 {
816    /* we have to escape \t, \n, \r, \ */
817    char c = '\0' ;
818
819    while (len > 0 && *src) {
820       switch (*src) {
821       case '\n':
822          c = 'n';
823          break;
824       case '\\':
825          c = '\\';
826          break;
827       case '\t':
828          c = 't';
829          break;
830       case '\r':
831          c = 'r';
832          break;
833       default:
834          c = '\0' ;
835       }
836
837       if (c) {
838          *dest = '\\';
839          dest++;
840          *dest = c;
841       } else {
842          *dest = *src;
843       }
844
845       len--;
846       src++;
847       dest++;
848    }
849
850    *dest = '\0';
851    return dest;
852 }
853
854 #ifdef HAVE_BATCH_FILE_INSERT
855 const char *my_pg_batch_lock_path_query = 
856    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
857
858
859 const char *my_pg_batch_lock_filename_query = 
860    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
861
862 const char *my_pg_batch_unlock_tables_query = "COMMIT";
863
864 const char *my_pg_batch_fill_path_query = 
865    "INSERT INTO Path (Path) "
866     "SELECT a.Path FROM "
867      "(SELECT DISTINCT Path FROM batch) AS a "
868       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
869
870
871 const char *my_pg_batch_fill_filename_query = 
872    "INSERT INTO Filename (Name) "
873     "SELECT a.Name FROM "
874      "(SELECT DISTINCT Name FROM batch) as a "
875       "WHERE NOT EXISTS "
876        "(SELECT Name FROM Filename WHERE Name = a.Name)";
877 #endif /* HAVE_BATCH_FILE_INSERT */
878
879 #endif /* HAVE_POSTGRESQL */