]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
fix variable name
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50 #include "pg_config_manual.h"   /* get NAMEDATALEN on version 8.3 or later */
51
52 /* -----------------------------------------------------------------------
53  *
54  *   PostgreSQL dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "PostgreSQL";
71
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket,
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101       }
102    }
103    Dmsg0(100, "db_open first time\n");
104    mdb = (B_DB *)malloc(sizeof(B_DB));
105    memset(mdb, 0, sizeof(B_DB));
106    mdb->db_name = bstrdup(db_name);
107    mdb->db_user = bstrdup(db_user);
108    if (db_password) {
109       mdb->db_password = bstrdup(db_password);
110    }
111    if (db_address) {
112       mdb->db_address  = bstrdup(db_address);
113    }
114    if (db_socket) {
115       mdb->db_socket   = bstrdup(db_socket);
116    }
117    mdb->db_port        = db_port;
118    mdb->have_insert_id = TRUE;
119    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
120    *mdb->errmsg        = 0;
121    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
122    mdb->cached_path    = get_pool_memory(PM_FNAME);
123    mdb->cached_path_id = 0;
124    mdb->ref_count      = 1;
125    mdb->fname          = get_pool_memory(PM_FNAME);
126    mdb->path           = get_pool_memory(PM_FNAME);
127    mdb->esc_name       = get_pool_memory(PM_FNAME);
128    mdb->esc_path      = get_pool_memory(PM_FNAME);
129    mdb->allow_transactions = mult_db_connections;
130    qinsert(&db_list, &mdb->bq);            /* put db in list */
131    V(mutex);
132    return mdb;
133 }
134
135 /* Check that the database correspond to the encoding we want */
136 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
137 {
138    SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151
152       if (ret) {
153          /* if we are in SQL_ASCII, we can force the client_encoding to SQL_ASCII too */
154          db_sql_query(mdb, "SET client_encoding TO 'SQL_ASCII'", NULL, NULL);
155
156       } else {                  /* something is wrong with database encoding */
157          Mmsg(mdb->errmsg, 
158               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
159               mdb->db_name, row[0]);
160          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
161          Dmsg1(50, "%s", mdb->errmsg);
162       } 
163    }
164    return ret;
165 }
166
167 /*
168  * Now actually open the database.  This can generate errors,
169  *   which are returned in the errmsg
170  *
171  * DO NOT close the database or free(mdb) here !!!!
172  */
173 int
174 db_open_database(JCR *jcr, B_DB *mdb)
175 {
176    int errstat;
177    char buf[10], *port;
178
179 #ifdef xxx                      /* require libpq >= 8.2 */
180    if (!PQisthreadsafe()) {
181       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
182            "PostgreSQL library is not thread safe. Cannot continue.\n"));
183    }
184 #endif
185    P(mutex);
186    if (mdb->connected) {
187       V(mutex);
188       return 1;
189    }
190    mdb->connected = false;
191
192    if ((errstat=rwl_init(&mdb->lock)) != 0) {
193       berrno be;
194       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
195             be.bstrerror(errstat));
196       V(mutex);
197       return 0;
198    }
199
200    if (mdb->db_port) {
201       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202       port = buf;
203    } else {
204       port = NULL;
205    }
206
207    /* If connection fails, try at 5 sec intervals for 30 seconds. */
208    for (int retry=0; retry < 6; retry++) {
209       /* connect to the database */
210       mdb->db = PQsetdbLogin(
211            mdb->db_address,           /* default = localhost */
212            port,                      /* default port */
213            NULL,                      /* pg options */
214            NULL,                      /* tty, ignored */
215            mdb->db_name,              /* database name */
216            mdb->db_user,              /* login name */
217            mdb->db_password);         /* password */
218
219       /* If no connect, try once more in case it is a timing problem */
220       if (PQstatus(mdb->db) == CONNECTION_OK) {
221          break;
222       }
223       bmicrosleep(5, 0);
224    }
225
226    Dmsg0(50, "pg_real_connect done\n");
227    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
228             mdb->db_password==NULL?"(NULL)":mdb->db_password);
229
230    if (PQstatus(mdb->db) != CONNECTION_OK) {
231       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
232             "Database=%s User=%s\n"
233             "It is probably not running or your password is incorrect.\n"),
234              mdb->db_name, mdb->db_user);
235       V(mutex);
236       return 0;
237    }
238
239    mdb->connected = true;
240
241    if (!check_tables_version(jcr, mdb)) {
242       V(mutex);
243       return 0;
244    }
245
246    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
247    
248    /* tell PostgreSQL we are using standard conforming strings
249       and avoid warnings such as:
250        WARNING:  nonstandard use of \\ in a string literal
251    */
252    sql_query(mdb, "set standard_conforming_strings=on");
253
254    /* check that encoding is SQL_ASCII */
255    check_database_encoding(jcr, mdb);
256
257    V(mutex);
258    return 1;
259 }
260
261 void
262 db_close_database(JCR *jcr, B_DB *mdb)
263 {
264    if (!mdb) {
265       return;
266    }
267    db_end_transaction(jcr, mdb);
268    P(mutex);
269    sql_free_result(mdb);
270    mdb->ref_count--;
271    if (mdb->ref_count == 0) {
272       qdchain(&mdb->bq);
273       if (mdb->connected && mdb->db) {
274          sql_close(mdb);
275       }
276       rwl_destroy(&mdb->lock);
277       free_pool_memory(mdb->errmsg);
278       free_pool_memory(mdb->cmd);
279       free_pool_memory(mdb->cached_path);
280       free_pool_memory(mdb->fname);
281       free_pool_memory(mdb->path);
282       free_pool_memory(mdb->esc_name);
283       free_pool_memory(mdb->esc_path);
284       if (mdb->db_name) {
285          free(mdb->db_name);
286       }
287       if (mdb->db_user) {
288          free(mdb->db_user);
289       }
290       if (mdb->db_password) {
291          free(mdb->db_password);
292       }
293       if (mdb->db_address) {
294          free(mdb->db_address);
295       }
296       if (mdb->db_socket) {
297          free(mdb->db_socket);
298       }
299       free(mdb);
300    }
301    V(mutex);
302 }
303
304 void db_thread_cleanup()
305 { }
306
307 /*
308  * Return the next unique index (auto-increment) for
309  * the given table.  Return NULL on error.
310  *
311  * For PostgreSQL, NULL causes the auto-increment value
312  *  to be updated.
313  */
314 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
315 {
316    strcpy(index, "NULL");
317    return 1;
318 }
319
320
321 /*
322  * Escape strings so that PostgreSQL is happy
323  *
324  *   NOTE! len is the length of the old string. Your new
325  *         string must be long enough (max 2*old+1) to hold
326  *         the escaped output.
327  */
328 void
329 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
330 {
331    int error;
332   
333    PQescapeStringConn(mdb->db, snew, old, len, &error);
334    if (error) {
335       Jmsg(jcr, M_FATAL, 0, _("PQescapeStringConn returned non-zero.\n"));
336       /* error on encoding, probably invalid multibyte encoding in the source string
337         see PQescapeStringConn documentation for details. */
338       Dmsg0(500, "PQescapeStringConn failed\n");
339    }
340 }
341
342 /*
343  * Submit a general SQL command (cmd), and for each row returned,
344  *  the sqlite_handler is called with the ctx.
345  */
346 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
347 {
348    SQL_ROW row;
349
350    Dmsg0(500, "db_sql_query started\n");
351
352    db_lock(mdb);
353    if (sql_query(mdb, query) != 0) {
354       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
355       db_unlock(mdb);
356       Dmsg0(500, "db_sql_query failed\n");
357       return false;
358    }
359    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
360
361    if (result_handler != NULL) {
362       Dmsg0(500, "db_sql_query invoking handler\n");
363       if ((mdb->result = sql_store_result(mdb)) != NULL) {
364          int num_fields = sql_num_fields(mdb);
365
366          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
367          while ((row = sql_fetch_row(mdb)) != NULL) {
368
369             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
370             if (result_handler(ctx, num_fields, row))
371                break;
372          }
373
374         sql_free_result(mdb);
375       }
376    }
377    db_unlock(mdb);
378
379    Dmsg0(500, "db_sql_query finished\n");
380
381    return true;
382 }
383
384
385
386 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
387 {
388    int j;
389    POSTGRESQL_ROW row = NULL; // by default, return NULL
390
391    Dmsg0(500, "my_postgresql_fetch_row start\n");
392
393    if (!mdb->row || mdb->row_size < mdb->num_fields) {
394       int num_fields = mdb->num_fields;
395       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
396
397       if (mdb->row) {
398          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
399          free(mdb->row);
400       }
401       num_fields += 20;                  /* add a bit extra */
402       mdb->row = (POSTGRESQL_ROW)malloc(sizeof(char *) * num_fields);
403       mdb->row_size = num_fields;
404
405       // now reset the row_number now that we have the space allocated
406       mdb->row_number = 0;
407    }
408
409    // if still within the result set
410    if (mdb->row_number >= 0 && mdb->row_number < mdb->num_rows) {
411       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
412       // get each value from this row
413       for (j = 0; j < mdb->num_fields; j++) {
414          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
415          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
416       }
417       // increment the row number for the next call
418       mdb->row_number++;
419
420       row = mdb->row;
421    } else {
422       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
423    }
424
425    Dmsg1(500, "my_postgresql_fetch_row finishes returning %p\n", row);
426
427    return row;
428 }
429
430 int my_postgresql_max_length(B_DB *mdb, int field_num) {
431    //
432    // for a given column, find the max length
433    //
434    int max_length;
435    int i;
436    int this_length;
437
438    max_length = 0;
439    for (i = 0; i < mdb->num_rows; i++) {
440       if (PQgetisnull(mdb->result, i, field_num)) {
441           this_length = 4;        // "NULL"
442       } else {
443           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
444       }
445
446       if (max_length < this_length) {
447           max_length = this_length;
448       }
449    }
450
451    return max_length;
452 }
453
454 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
455 {
456    int     i;
457
458    Dmsg0(500, "my_postgresql_fetch_field starts\n");
459
460    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
461       if (mdb->fields) {
462          free(mdb->fields);
463       }
464       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
465       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
466       mdb->fields_size = mdb->num_fields;
467
468       for (i = 0; i < mdb->num_fields; i++) {
469          Dmsg1(500, "filling field %d\n", i);
470          mdb->fields[i].name           = PQfname(mdb->result, i);
471          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
472          mdb->fields[i].type       = PQftype(mdb->result, i);
473          mdb->fields[i].flags      = 0;
474
475          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
476             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
477             mdb->fields[i].flags);
478       } // end for
479    } // end if
480
481    // increment field number for the next time around
482
483    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
484    return &mdb->fields[mdb->field_number++];
485 }
486
487 void my_postgresql_data_seek(B_DB *mdb, int row)
488 {
489    // set the row number to be returned on the next call
490    // to my_postgresql_fetch_row
491    mdb->row_number = row;
492 }
493
494 void my_postgresql_field_seek(B_DB *mdb, int field)
495 {
496    mdb->field_number = field;
497 }
498
499 /*
500  * Note, if this routine returns 1 (failure), Bacula expects
501  *  that no result has been stored.
502  * This is where QUERY_DB comes with Postgresql.
503  *
504  *  Returns:  0  on success
505  *            1  on failure
506  *
507  */
508 int my_postgresql_query(B_DB *mdb, const char *query)
509 {
510    Dmsg0(500, "my_postgresql_query started\n");
511    // We are starting a new query.  reset everything.
512    mdb->num_rows     = -1;
513    mdb->row_number   = -1;
514    mdb->field_number = -1;
515
516    if (mdb->result) {
517       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
518       mdb->result = NULL;
519    }
520
521    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
522
523    for (int i=0; i < 10; i++) {
524       mdb->result = PQexec(mdb->db, query);
525       if (mdb->result) {
526          break;
527       }
528       bmicrosleep(5, 0);
529    }
530    if (!mdb->result) {
531       Dmsg1(50, "Query failed: %s\n", query);
532       goto bail_out;
533    }
534
535    mdb->status = PQresultStatus(mdb->result);
536    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
537       Dmsg1(500, "we have a result\n", query);
538
539       // how many fields in the set?
540       mdb->num_fields = (int)PQnfields(mdb->result);
541       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
542
543       mdb->num_rows = PQntuples(mdb->result);
544       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
545
546       mdb->row_number = 0;      /* we can start to fetch something */
547       mdb->status = 0;          /* succeed */
548    } else {
549       Dmsg1(50, "Result status failed: %s\n", query);
550       goto bail_out;
551    }
552
553    Dmsg0(500, "my_postgresql_query finishing\n");
554    return mdb->status;
555
556 bail_out:
557    Dmsg1(500, "we failed\n", query);
558    PQclear(mdb->result);
559    mdb->result = NULL;
560    mdb->status = 1;                   /* failed */
561    return mdb->status;
562 }
563
564 void my_postgresql_free_result(B_DB *mdb)
565 {
566    
567    db_lock(mdb);
568    if (mdb->result) {
569       PQclear(mdb->result);
570       mdb->result = NULL;
571    }
572
573    if (mdb->row) {
574       free(mdb->row);
575       mdb->row = NULL;
576    }
577
578    if (mdb->fields) {
579       free(mdb->fields);
580       mdb->fields = NULL;
581    }
582    db_unlock(mdb);
583 }
584
585 int my_postgresql_currval(B_DB *mdb, const char *table_name)
586 {
587    // Obtain the current value of the sequence that
588    // provides the serial value for primary key of the table.
589
590    // currval is local to our session.  It is not affected by
591    // other transactions.
592
593    // Determine the name of the sequence.
594    // PostgreSQL automatically creates a sequence using
595    // <table>_<column>_seq.
596    // At the time of writing, all tables used this format for
597    // for their primary key: <table>id
598    // Except for basefiles which has a primary key on baseid.
599    // Therefore, we need to special case that one table.
600
601    // everything else can use the PostgreSQL formula.
602
603    char      sequence[NAMEDATALEN-1];
604    char      query   [NAMEDATALEN+50];
605    PGresult *result;
606    int       id = 0;
607
608    if (strcasecmp(table_name, "basefiles") == 0) {
609       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
610    } else {
611       bstrncpy(sequence, table_name, sizeof(sequence));
612       bstrncat(sequence, "_",        sizeof(sequence));
613       bstrncat(sequence, table_name, sizeof(sequence));
614       bstrncat(sequence, "id",       sizeof(sequence));
615    }
616
617    bstrncat(sequence, "_seq", sizeof(sequence));
618    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
619
620    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
621    for (int i=0; i < 10; i++) {
622       result = PQexec(mdb->db, query);
623       if (result) {
624          break;
625       }
626       bmicrosleep(5, 0);
627    }
628    if (!result) {
629       Dmsg1(50, "Query failed: %s\n", query);
630       goto bail_out;
631    }
632
633    Dmsg0(500, "exec done");
634
635    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
636       Dmsg0(500, "getting value");
637       id = atoi(PQgetvalue(result, 0, 0));
638       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
639    } else {
640       Dmsg1(50, "Result status failed: %s\n", query);
641       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
642    }
643
644 bail_out:
645    PQclear(result);
646
647    return id;
648 }
649
650 #ifdef HAVE_BATCH_FILE_INSERT
651
652 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
653 {
654    const char *query = "COPY batch FROM STDIN";
655
656    Dmsg0(500, "my_postgresql_batch_start started\n");
657
658    if (my_postgresql_query(mdb,
659                            "CREATE TEMPORARY TABLE batch ("
660                                "fileindex int,"
661                                "jobid int,"
662                                "path varchar,"
663                                "name varchar,"
664                                "lstat varchar,"
665                                "md5 varchar)") == 1)
666    {
667       Dmsg0(500, "my_postgresql_batch_start failed\n");
668       return 1;
669    }
670    
671    // We are starting a new query.  reset everything.
672    mdb->num_rows     = -1;
673    mdb->row_number   = -1;
674    mdb->field_number = -1;
675
676    my_postgresql_free_result(mdb);
677
678    for (int i=0; i < 10; i++) {
679       mdb->result = PQexec(mdb->db, query);
680       if (mdb->result) {
681          break;
682       }
683       bmicrosleep(5, 0);
684    }
685    if (!mdb->result) {
686       Dmsg1(50, "Query failed: %s\n", query);
687       goto bail_out;
688    }
689
690    mdb->status = PQresultStatus(mdb->result);
691    if (mdb->status == PGRES_COPY_IN) {
692       // how many fields in the set?
693       mdb->num_fields = (int) PQnfields(mdb->result);
694       mdb->num_rows   = 0;
695       mdb->status = 1;
696    } else {
697       Dmsg1(50, "Result status failed: %s\n", query);
698       goto bail_out;
699    }
700
701    Dmsg0(500, "my_postgresql_batch_start finishing\n");
702
703    return mdb->status;
704
705 bail_out:
706    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
707    mdb->status = 0;
708    PQclear(mdb->result);
709    mdb->result = NULL;
710    return mdb->status;
711 }
712
713 /* set error to something to abort operation */
714 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
715 {
716    int res;
717    int count=30;
718    PGresult *result;
719    Dmsg0(500, "my_postgresql_batch_end started\n");
720
721    if (!mdb) {                  /* no files ? */
722       return 0;
723    }
724
725    do { 
726       res = PQputCopyEnd(mdb->db, error);
727    } while (res == 0 && --count > 0);
728
729    if (res == 1) {
730       Dmsg0(500, "ok\n");
731       mdb->status = 1;
732    }
733    
734    if (res <= 0) {
735       Dmsg0(500, "we failed\n");
736       mdb->status = 0;
737       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
738    }
739
740    /* Check command status and return to normal libpq state */
741    result = PQgetResult(mdb->db);
742    if (PQresultStatus(result) != PGRES_COMMAND_OK) {
743       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
744       mdb->status = 0;
745    }
746    PQclear(result); 
747
748    Dmsg0(500, "my_postgresql_batch_end finishing\n");
749
750    return mdb->status;
751 }
752
753 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
754 {
755    int res;
756    int count=30;
757    size_t len;
758    const char *digest;
759    char ed1[50];
760
761    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
762    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
763
764    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
765    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
766
767    if (ar->Digest == NULL || ar->Digest[0] == 0) {
768       digest = "0";
769    } else {
770       digest = ar->Digest;
771    }
772
773    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
774               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
775               mdb->esc_name, ar->attr, digest);
776
777    do { 
778       res = PQputCopyData(mdb->db,
779                           mdb->cmd,
780                           len);
781    } while (res == 0 && --count > 0);
782
783    if (res == 1) {
784       Dmsg0(500, "ok\n");
785       mdb->changes++;
786       mdb->status = 1;
787    }
788
789    if (res <= 0) {
790       Dmsg0(500, "we failed\n");
791       mdb->status = 0;
792       Mmsg1(&mdb->errmsg, _("error copying in batch mode: %s"), PQerrorMessage(mdb->db));
793    }
794
795    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
796
797    return mdb->status;
798 }
799
800 #endif /* HAVE_BATCH_FILE_INSERT */
801
802 /*
803  * Escape strings so that PostgreSQL is happy on COPY
804  *
805  *   NOTE! len is the length of the old string. Your new
806  *         string must be long enough (max 2*old+1) to hold
807  *         the escaped output.
808  */
809 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
810 {
811    /* we have to escape \t, \n, \r, \ */
812    char c = '\0' ;
813
814    while (len > 0 && *src) {
815       switch (*src) {
816       case '\n':
817          c = 'n';
818          break;
819       case '\\':
820          c = '\\';
821          break;
822       case '\t':
823          c = 't';
824          break;
825       case '\r':
826          c = 'r';
827          break;
828       default:
829          c = '\0' ;
830       }
831
832       if (c) {
833          *dest = '\\';
834          dest++;
835          *dest = c;
836       } else {
837          *dest = *src;
838       }
839
840       len--;
841       src++;
842       dest++;
843    }
844
845    *dest = '\0';
846    return dest;
847 }
848
849 #ifdef HAVE_BATCH_FILE_INSERT
850 const char *my_pg_batch_lock_path_query = 
851    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
852
853
854 const char *my_pg_batch_lock_filename_query = 
855    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
856
857 const char *my_pg_batch_unlock_tables_query = "COMMIT";
858
859 const char *my_pg_batch_fill_path_query = 
860    "INSERT INTO Path (Path) "
861     "SELECT a.Path FROM "
862      "(SELECT DISTINCT Path FROM batch) AS a "
863       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
864
865
866 const char *my_pg_batch_fill_filename_query = 
867    "INSERT INTO Filename (Name) "
868     "SELECT a.Name FROM "
869      "(SELECT DISTINCT Name FROM batch) as a "
870       "WHERE NOT EXISTS "
871        "(SELECT Name FROM Filename WHERE Name = a.Name)";
872 #endif /* HAVE_BATCH_FILE_INSERT */
873
874 #endif /* HAVE_POSTGRESQL */