]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
kes Add some additional locking in the cats directory in subroutines
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146 #ifdef xxx
147    if (!PQisthreadsafe()) {
148       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
149            "PostgreSQL library is not thread safe. Connot continue.\n"));
150    }
151 #endif
152    P(mutex);
153    if (mdb->connected) {
154       V(mutex);
155       return 1;
156    }
157    mdb->connected = false;
158
159    if ((errstat=rwl_init(&mdb->lock)) != 0) {
160       berrno be;
161       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
162             be.bstrerror(errstat));
163       V(mutex);
164       return 0;
165    }
166
167    if (mdb->db_port) {
168       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
169       port = buf;
170    } else {
171       port = NULL;
172    }
173
174    /* If connection fails, try at 5 sec intervals for 30 seconds. */
175    for (int retry=0; retry < 6; retry++) {
176       /* connect to the database */
177       mdb->db = PQsetdbLogin(
178            mdb->db_address,           /* default = localhost */
179            port,                      /* default port */
180            NULL,                      /* pg options */
181            NULL,                      /* tty, ignored */
182            mdb->db_name,              /* database name */
183            mdb->db_user,              /* login name */
184            mdb->db_password);         /* password */
185
186       /* If no connect, try once more in case it is a timing problem */
187       if (PQstatus(mdb->db) == CONNECTION_OK) {
188          break;
189       }
190       bmicrosleep(5, 0);
191    }
192
193    Dmsg0(50, "pg_real_connect done\n");
194    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
195             mdb->db_password==NULL?"(NULL)":mdb->db_password);
196
197    if (PQstatus(mdb->db) != CONNECTION_OK) {
198       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
199             "Database=%s User=%s\n"
200             "It is probably not running or your password is incorrect.\n"),
201              mdb->db_name, mdb->db_user);
202       V(mutex);
203       return 0;
204    }
205
206    mdb->connected = true;
207
208    if (!check_tables_version(jcr, mdb)) {
209       V(mutex);
210       return 0;
211    }
212
213    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
214
215    V(mutex);
216    return 1;
217 }
218
219 void
220 db_close_database(JCR *jcr, B_DB *mdb)
221 {
222    if (!mdb) {
223       return;
224    }
225    db_end_transaction(jcr, mdb);
226    P(mutex);
227    sql_free_result(mdb);
228    mdb->ref_count--;
229    if (mdb->ref_count == 0) {
230       qdchain(&mdb->bq);
231       if (mdb->connected && mdb->db) {
232          sql_close(mdb);
233       }
234       rwl_destroy(&mdb->lock);
235       free_pool_memory(mdb->errmsg);
236       free_pool_memory(mdb->cmd);
237       free_pool_memory(mdb->cached_path);
238       free_pool_memory(mdb->fname);
239       free_pool_memory(mdb->path);
240       free_pool_memory(mdb->esc_name);
241       free_pool_memory(mdb->esc_path);
242       if (mdb->db_name) {
243          free(mdb->db_name);
244       }
245       if (mdb->db_user) {
246          free(mdb->db_user);
247       }
248       if (mdb->db_password) {
249          free(mdb->db_password);
250       }
251       if (mdb->db_address) {
252          free(mdb->db_address);
253       }
254       if (mdb->db_socket) {
255          free(mdb->db_socket);
256       }
257       my_postgresql_free_result(mdb);
258       free(mdb);
259    }
260    V(mutex);
261 }
262
263 void db_thread_cleanup()
264 { }
265
266 /*
267  * Return the next unique index (auto-increment) for
268  * the given table.  Return NULL on error.
269  *
270  * For PostgreSQL, NULL causes the auto-increment value
271  *  to be updated.
272  */
273 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
274 {
275    strcpy(index, "NULL");
276    return 1;
277 }
278
279
280 /*
281  * Escape strings so that PostgreSQL is happy
282  *
283  *   NOTE! len is the length of the old string. Your new
284  *         string must be long enough (max 2*old+1) to hold
285  *         the escaped output.
286  */
287 void
288 db_escape_string(char *snew, char *old, int len)
289 {
290    PQescapeString(snew, old, len);
291 }
292
293 /*
294  * Submit a general SQL command (cmd), and for each row returned,
295  *  the sqlite_handler is called with the ctx.
296  */
297 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
298 {
299    SQL_ROW row;
300
301    Dmsg0(500, "db_sql_query started\n");
302
303    db_lock(mdb);
304    if (sql_query(mdb, query) != 0) {
305       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
306       db_unlock(mdb);
307       Dmsg0(500, "db_sql_query failed\n");
308       return 0;
309    }
310    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
311
312    if (result_handler != NULL) {
313       Dmsg0(500, "db_sql_query invoking handler\n");
314       if ((mdb->result = sql_store_result(mdb)) != NULL) {
315          int num_fields = sql_num_fields(mdb);
316
317          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
318          while ((row = sql_fetch_row(mdb)) != NULL) {
319
320             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
321             if (result_handler(ctx, num_fields, row))
322                break;
323          }
324
325         sql_free_result(mdb);
326       }
327    }
328    db_unlock(mdb);
329
330    Dmsg0(500, "db_sql_query finished\n");
331
332    return 1;
333 }
334
335
336
337 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
338 {
339    int j;
340    POSTGRESQL_ROW row = NULL; // by default, return NULL
341
342    Dmsg0(500, "my_postgresql_fetch_row start\n");
343
344    if (!mdb->row || mdb->row_size < mdb->num_fields) {
345       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
346
347       if (mdb->row) {
348          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
349          free(mdb->row);
350       }
351       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
352       mdb->row_size = mdb->num_fields;
353
354       // now reset the row_number now that we have the space allocated
355       mdb->row_number = 0;
356    }
357
358    // if still within the result set
359    if (mdb->row_number < mdb->num_rows) {
360       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
361       // get each value from this row
362       for (j = 0; j < mdb->num_fields; j++) {
363          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
364          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
365       }
366       // increment the row number for the next call
367       mdb->row_number++;
368
369       row = mdb->row;
370    } else {
371       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
372    }
373
374    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
375
376    return row;
377 }
378
379 int my_postgresql_max_length(B_DB *mdb, int field_num) {
380    //
381    // for a given column, find the max length
382    //
383    int max_length;
384    int i;
385    int this_length;
386
387    max_length = 0;
388    for (i = 0; i < mdb->num_rows; i++) {
389       if (PQgetisnull(mdb->result, i, field_num)) {
390           this_length = 4;        // "NULL"
391       } else {
392           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
393       }
394
395       if (max_length < this_length) {
396           max_length = this_length;
397       }
398    }
399
400    return max_length;
401 }
402
403 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
404 {
405    int     i;
406
407    Dmsg0(500, "my_postgresql_fetch_field starts\n");
408
409    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
410       if (mdb->fields) {
411          free(mdb->fields);
412       }
413       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
414       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
415       mdb->fields_size = mdb->num_fields;
416
417       for (i = 0; i < mdb->num_fields; i++) {
418          Dmsg1(500, "filling field %d\n", i);
419          mdb->fields[i].name           = PQfname(mdb->result, i);
420          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
421          mdb->fields[i].type       = PQftype(mdb->result, i);
422          mdb->fields[i].flags      = 0;
423
424          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
425             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
426             mdb->fields[i].flags);
427       } // end for
428    } // end if
429
430    // increment field number for the next time around
431
432    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
433    return &mdb->fields[mdb->field_number++];
434 }
435
436 void my_postgresql_data_seek(B_DB *mdb, int row)
437 {
438    // set the row number to be returned on the next call
439    // to my_postgresql_fetch_row
440    mdb->row_number = row;
441 }
442
443 void my_postgresql_field_seek(B_DB *mdb, int field)
444 {
445    mdb->field_number = field;
446 }
447
448 /*
449  * Note, if this routine returns 1 (failure), Bacula expects
450  *  that no result has been stored.
451  * This is where QUERY_DB comes with Postgresql.
452  *
453  *  Returns:  0  on success
454  *            1  on failure
455  *
456  */
457 int my_postgresql_query(B_DB *mdb, const char *query)
458 {
459    Dmsg0(500, "my_postgresql_query started\n");
460    // We are starting a new query.  reset everything.
461    mdb->num_rows     = -1;
462    mdb->row_number   = -1;
463    mdb->field_number = -1;
464
465    if (mdb->result) {
466       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
467       mdb->result = NULL;
468    }
469
470    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
471
472    for (int i=0; i < 10; i++) {
473       mdb->result = PQexec(mdb->db, query);
474       if (mdb->result) {
475          break;
476       }
477       bmicrosleep(5, 0);
478    }
479    if (!mdb->result) {
480       Dmsg1(50, "Query failed: %s\n", query);
481       goto bail_out;
482    }
483
484    mdb->status = PQresultStatus(mdb->result);
485    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
486       Dmsg1(500, "we have a result\n", query);
487
488       // how many fields in the set?
489       mdb->num_fields = (int)PQnfields(mdb->result);
490       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
491
492       mdb->num_rows = PQntuples(mdb->result);
493       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
494
495       mdb->status = 0;                  /* succeed */
496    } else {
497       Dmsg1(50, "Result status failed: %s\n", query);
498       goto bail_out;
499    }
500
501    Dmsg0(500, "my_postgresql_query finishing\n");
502    return mdb->status;
503
504 bail_out:
505    Dmsg1(500, "we failed\n", query);
506    PQclear(mdb->result);
507    mdb->result = NULL;
508    mdb->status = 1;                   /* failed */
509    return mdb->status;
510 }
511
512 void my_postgresql_free_result(B_DB *mdb)
513 {
514    
515    db_lock(mdb);
516    if (mdb->result) {
517       PQclear(mdb->result);
518       mdb->result = NULL;
519    }
520
521    if (mdb->row) {
522       free(mdb->row);
523       mdb->row = NULL;
524    }
525
526    if (mdb->fields) {
527       free(mdb->fields);
528       mdb->fields = NULL;
529    }
530    db_unlock(mdb);
531 }
532
533 int my_postgresql_currval(B_DB *mdb, char *table_name)
534 {
535    // Obtain the current value of the sequence that
536    // provides the serial value for primary key of the table.
537
538    // currval is local to our session.  It is not affected by
539    // other transactions.
540
541    // Determine the name of the sequence.
542    // PostgreSQL automatically creates a sequence using
543    // <table>_<column>_seq.
544    // At the time of writing, all tables used this format for
545    // for their primary key: <table>id
546    // Except for basefiles which has a primary key on baseid.
547    // Therefore, we need to special case that one table.
548
549    // everything else can use the PostgreSQL formula.
550
551    char      sequence[NAMEDATALEN-1];
552    char      query   [NAMEDATALEN+50];
553    PGresult *result;
554    int       id = 0;
555
556    if (strcasecmp(table_name, "basefiles") == 0) {
557       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
558    } else {
559       bstrncpy(sequence, table_name, sizeof(sequence));
560       bstrncat(sequence, "_",        sizeof(sequence));
561       bstrncat(sequence, table_name, sizeof(sequence));
562       bstrncat(sequence, "id",       sizeof(sequence));
563    }
564
565    bstrncat(sequence, "_seq", sizeof(sequence));
566    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
567
568    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
569    for (int i=0; i < 10; i++) {
570       result = PQexec(mdb->db, query);
571       if (result) {
572          break;
573       }
574       bmicrosleep(5, 0);
575    }
576    if (!result) {
577       Dmsg1(50, "Query failed: %s\n", query);
578       goto bail_out;
579    }
580
581    Dmsg0(500, "exec done");
582
583    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
584       Dmsg0(500, "getting value");
585       id = atoi(PQgetvalue(result, 0, 0));
586       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
587    } else {
588       Dmsg1(50, "Result status failed: %s\n", query);
589       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
590    }
591
592 bail_out:
593    PQclear(result);
594
595    return id;
596 }
597
598 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
599 {
600    char *query = "COPY batch FROM STDIN";
601
602    Dmsg0(500, "my_postgresql_batch_start started\n");
603
604    if (my_postgresql_query(mdb,
605                            " CREATE TEMPORARY TABLE batch "
606                            "        (fileindex int,       "
607                            "        jobid int,            "
608                            "        path varchar,         "
609                            "        name varchar,         "
610                            "        lstat varchar,        "
611                            "        md5 varchar)") == 1)
612    {
613       Dmsg0(500, "my_postgresql_batch_start failed\n");
614       return 1;
615    }
616    
617    // We are starting a new query.  reset everything.
618    mdb->num_rows     = -1;
619    mdb->row_number   = -1;
620    mdb->field_number = -1;
621
622    if (mdb->result != NULL) {
623       my_postgresql_free_result(mdb);
624    }
625
626    for (int i=0; i < 10; i++) {
627       mdb->result = PQexec(mdb->db, query);
628       if (mdb->result) {
629          break;
630       }
631       bmicrosleep(5, 0);
632    }
633    if (!mdb->result) {
634       Dmsg1(50, "Query failed: %s\n", query);
635       goto bail_out;
636    }
637
638    mdb->status = PQresultStatus(mdb->result);
639    if (mdb->status == PGRES_COPY_IN) {
640       // how many fields in the set?
641       mdb->num_fields = (int) PQnfields(mdb->result);
642       mdb->num_rows   = 0;
643       mdb->status = 1;
644    } else {
645       Dmsg1(50, "Result status failed: %s\n", query);
646       goto bail_out;
647    }
648
649    Dmsg0(500, "my_postgresql_batch_start finishing\n");
650
651    return mdb->status;
652
653 bail_out:
654    mdb->status = 0;
655    PQclear(mdb->result);
656    mdb->result = NULL;
657    return mdb->status;
658 }
659
660 /* set error to something to abort operation */
661 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
662 {
663    int res;
664    int count=30;
665    Dmsg0(500, "my_postgresql_batch_end started\n");
666
667    if (!mdb) {                  /* no files ? */
668       return 0;
669    }
670
671    do { 
672       res = PQputCopyEnd(mdb->db, error);
673    } while (res == 0 && --count > 0);
674
675    if (res == 1) {
676       Dmsg0(500, "ok\n");
677       mdb->status = 1;
678    }
679    
680    if (res <= 0) {
681       Dmsg0(500, "we failed\n");
682       mdb->status = 0;
683       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
684    }
685    
686    Dmsg0(500, "my_postgresql_batch_end finishing\n");
687
688    return mdb->status;
689 }
690
691 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
692 {
693    int res;
694    int count=30;
695    size_t len;
696    char *digest;
697    char ed1[50];
698
699    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
700    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
701
702    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
703    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
704
705    if (ar->Digest == NULL || ar->Digest[0] == 0) {
706       digest = "0";
707    } else {
708       digest = ar->Digest;
709    }
710
711    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
712               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
713               mdb->esc_name, ar->attr, digest);
714
715    do { 
716       res = PQputCopyData(mdb->db,
717                           mdb->cmd,
718                           len);
719    } while (res == 0 && --count > 0);
720
721    if (res == 1) {
722       Dmsg0(500, "ok\n");
723       mdb->changes++;
724       mdb->status = 1;
725    }
726
727    if (res <= 0) {
728       Dmsg0(500, "we failed\n");
729       mdb->status = 0;
730       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
731    }
732
733    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
734
735    return mdb->status;
736 }
737
738 /*
739  * Escape strings so that PostgreSQL is happy on COPY
740  *
741  *   NOTE! len is the length of the old string. Your new
742  *         string must be long enough (max 2*old+1) to hold
743  *         the escaped output.
744  */
745 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
746 {
747    /* we have to escape \t, \n, \r, \ */
748    char c = '\0' ;
749
750    while (len > 0 && *src) {
751       switch (*src) {
752       case '\n':
753          c = 'n';
754          break;
755       case '\\':
756          c = '\\';
757          break;
758       case '\t':
759          c = 't';
760          break;
761       case '\r':
762          c = 'r';
763          break;
764       default:
765          c = '\0' ;
766       }
767
768       if (c) {
769          *dest = '\\';
770          dest++;
771          *dest = c;
772       } else {
773          *dest = *src;
774       }
775
776       len--;
777       src++;
778       dest++;
779    }
780
781    *dest = '\0';
782    return dest;
783 }
784
785 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
786
787
788 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
789
790 char *my_pg_batch_unlock_tables_query = "COMMIT";
791
792 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
793                                     "  SELECT a.Path FROM                                       "
794                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
795                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
796
797
798 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
799                                         "  SELECT a.Name FROM               "
800                                         "    (SELECT DISTINCT Name FROM batch) as a "
801                                         "    WHERE NOT EXISTS               "
802                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
803 #endif /* HAVE_POSTGRESQL */