]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
kes Implement --with-batch-insert in configure and detection of thread
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation plus additions
11    that are listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146 #ifdef xxx
147    if (!PQisthreadsafe()) {
148       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
149            "PostgreSQL library is not thread safe. Connot continue.\n"));
150    }
151 #endif
152    P(mutex);
153    if (mdb->connected) {
154       V(mutex);
155       return 1;
156    }
157    mdb->connected = false;
158
159    if ((errstat=rwl_init(&mdb->lock)) != 0) {
160       berrno be;
161       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
162             be.bstrerror(errstat));
163       V(mutex);
164       return 0;
165    }
166
167    if (mdb->db_port) {
168       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
169       port = buf;
170    } else {
171       port = NULL;
172    }
173
174    /* If connection fails, try at 5 sec intervals for 30 seconds. */
175    for (int retry=0; retry < 6; retry++) {
176       /* connect to the database */
177       mdb->db = PQsetdbLogin(
178            mdb->db_address,           /* default = localhost */
179            port,                      /* default port */
180            NULL,                      /* pg options */
181            NULL,                      /* tty, ignored */
182            mdb->db_name,              /* database name */
183            mdb->db_user,              /* login name */
184            mdb->db_password);         /* password */
185
186       /* If no connect, try once more in case it is a timing problem */
187       if (PQstatus(mdb->db) == CONNECTION_OK) {
188          break;
189       }
190       bmicrosleep(5, 0);
191    }
192
193    Dmsg0(50, "pg_real_connect done\n");
194    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
195             mdb->db_password==NULL?"(NULL)":mdb->db_password);
196
197    if (PQstatus(mdb->db) != CONNECTION_OK) {
198       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
199             "Database=%s User=%s\n"
200             "It is probably not running or your password is incorrect.\n"),
201              mdb->db_name, mdb->db_user);
202       V(mutex);
203       return 0;
204    }
205
206    mdb->connected = true;
207
208    if (!check_tables_version(jcr, mdb)) {
209       V(mutex);
210       return 0;
211    }
212
213    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
214
215    V(mutex);
216    return 1;
217 }
218
219 void
220 db_close_database(JCR *jcr, B_DB *mdb)
221 {
222    if (!mdb) {
223       return;
224    }
225    db_end_transaction(jcr, mdb);
226    P(mutex);
227    sql_free_result(mdb);
228    mdb->ref_count--;
229    if (mdb->ref_count == 0) {
230       qdchain(&mdb->bq);
231       if (mdb->connected && mdb->db) {
232          sql_close(mdb);
233       }
234       rwl_destroy(&mdb->lock);
235       free_pool_memory(mdb->errmsg);
236       free_pool_memory(mdb->cmd);
237       free_pool_memory(mdb->cached_path);
238       free_pool_memory(mdb->fname);
239       free_pool_memory(mdb->path);
240       free_pool_memory(mdb->esc_name);
241       free_pool_memory(mdb->esc_path);
242       if (mdb->db_name) {
243          free(mdb->db_name);
244       }
245       if (mdb->db_user) {
246          free(mdb->db_user);
247       }
248       if (mdb->db_password) {
249          free(mdb->db_password);
250       }
251       if (mdb->db_address) {
252          free(mdb->db_address);
253       }
254       if (mdb->db_socket) {
255          free(mdb->db_socket);
256       }
257       my_postgresql_free_result(mdb);
258       free(mdb);
259    }
260    V(mutex);
261 }
262
263 void db_thread_cleanup()
264 { }
265
266 /*
267  * Return the next unique index (auto-increment) for
268  * the given table.  Return NULL on error.
269  *
270  * For PostgreSQL, NULL causes the auto-increment value
271  *  to be updated.
272  */
273 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
274 {
275    strcpy(index, "NULL");
276    return 1;
277 }
278
279
280 /*
281  * Escape strings so that PostgreSQL is happy
282  *
283  *   NOTE! len is the length of the old string. Your new
284  *         string must be long enough (max 2*old+1) to hold
285  *         the escaped output.
286  */
287 void
288 db_escape_string(char *snew, char *old, int len)
289 {
290    PQescapeString(snew, old, len);
291 }
292
293 /*
294  * Submit a general SQL command (cmd), and for each row returned,
295  *  the sqlite_handler is called with the ctx.
296  */
297 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
298 {
299    SQL_ROW row;
300
301    Dmsg0(500, "db_sql_query started\n");
302
303    db_lock(mdb);
304    if (sql_query(mdb, query) != 0) {
305       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
306       db_unlock(mdb);
307       Dmsg0(500, "db_sql_query failed\n");
308       return 0;
309    }
310    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
311
312    if (result_handler != NULL) {
313       Dmsg0(500, "db_sql_query invoking handler\n");
314       if ((mdb->result = sql_store_result(mdb)) != NULL) {
315          int num_fields = sql_num_fields(mdb);
316
317          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
318          while ((row = sql_fetch_row(mdb)) != NULL) {
319
320             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
321             if (result_handler(ctx, num_fields, row))
322                break;
323          }
324
325         sql_free_result(mdb);
326       }
327    }
328    db_unlock(mdb);
329
330    Dmsg0(500, "db_sql_query finished\n");
331
332    return 1;
333 }
334
335
336
337 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
338 {
339    int j;
340    POSTGRESQL_ROW row = NULL; // by default, return NULL
341
342    Dmsg0(500, "my_postgresql_fetch_row start\n");
343
344    if (!mdb->row || mdb->row_size < mdb->num_fields) {
345       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
346
347       if (mdb->row) {
348          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
349          free(mdb->row);
350       }
351       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
352       mdb->row_size = mdb->num_fields;
353
354       // now reset the row_number now that we have the space allocated
355       mdb->row_number = 0;
356    }
357
358    // if still within the result set
359    if (mdb->row_number < mdb->num_rows) {
360       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
361       // get each value from this row
362       for (j = 0; j < mdb->num_fields; j++) {
363          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
364          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
365       }
366       // increment the row number for the next call
367       mdb->row_number++;
368
369       row = mdb->row;
370    } else {
371       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
372    }
373
374    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
375
376    return row;
377 }
378
379 int my_postgresql_max_length(B_DB *mdb, int field_num) {
380    //
381    // for a given column, find the max length
382    //
383    int max_length;
384    int i;
385    int this_length;
386
387    max_length = 0;
388    for (i = 0; i < mdb->num_rows; i++) {
389       if (PQgetisnull(mdb->result, i, field_num)) {
390           this_length = 4;        // "NULL"
391       } else {
392           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
393       }
394
395       if (max_length < this_length) {
396           max_length = this_length;
397       }
398    }
399
400    return max_length;
401 }
402
403 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
404 {
405    int     i;
406
407    Dmsg0(500, "my_postgresql_fetch_field starts\n");
408
409    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
410       if (mdb->fields) {
411          free(mdb->fields);
412       }
413       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
414       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
415       mdb->fields_size = mdb->num_fields;
416
417       for (i = 0; i < mdb->num_fields; i++) {
418          Dmsg1(500, "filling field %d\n", i);
419          mdb->fields[i].name           = PQfname(mdb->result, i);
420          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
421          mdb->fields[i].type       = PQftype(mdb->result, i);
422          mdb->fields[i].flags      = 0;
423
424          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
425             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
426             mdb->fields[i].flags);
427       } // end for
428    } // end if
429
430    // increment field number for the next time around
431
432    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
433    return &mdb->fields[mdb->field_number++];
434 }
435
436 void my_postgresql_data_seek(B_DB *mdb, int row)
437 {
438    // set the row number to be returned on the next call
439    // to my_postgresql_fetch_row
440    mdb->row_number = row;
441 }
442
443 void my_postgresql_field_seek(B_DB *mdb, int field)
444 {
445    mdb->field_number = field;
446 }
447
448 /*
449  * Note, if this routine returns 1 (failure), Bacula expects
450  *  that no result has been stored.
451  * This is where QUERY_DB comes with Postgresql.
452  *
453  *  Returns:  0  on success
454  *            1  on failure
455  *
456  */
457 int my_postgresql_query(B_DB *mdb, const char *query)
458 {
459    Dmsg0(500, "my_postgresql_query started\n");
460    // We are starting a new query.  reset everything.
461    mdb->num_rows     = -1;
462    mdb->row_number   = -1;
463    mdb->field_number = -1;
464
465    if (mdb->result) {
466       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
467       mdb->result = NULL;
468    }
469
470    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
471
472    for (int i=0; i < 10; i++) {
473       mdb->result = PQexec(mdb->db, query);
474       if (mdb->result) {
475          break;
476       }
477       bmicrosleep(5, 0);
478    }
479    if (!mdb->result) {
480       Dmsg1(50, "Query failed: %s\n", query);
481       goto bail_out;
482    }
483
484    mdb->status = PQresultStatus(mdb->result);
485    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
486       Dmsg1(500, "we have a result\n", query);
487
488       // how many fields in the set?
489       mdb->num_fields = (int)PQnfields(mdb->result);
490       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
491
492       mdb->num_rows = PQntuples(mdb->result);
493       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
494
495       mdb->status = 0;                  /* succeed */
496    } else {
497       Dmsg1(50, "Result status failed: %s\n", query);
498       goto bail_out;
499    }
500
501    Dmsg0(500, "my_postgresql_query finishing\n");
502    return mdb->status;
503
504 bail_out:
505    Dmsg1(500, "we failed\n", query);
506    PQclear(mdb->result);
507    mdb->result = NULL;
508    mdb->status = 1;                   /* failed */
509    return mdb->status;
510 }
511
512 void my_postgresql_free_result(B_DB *mdb)
513 {
514    if (mdb->result) {
515       PQclear(mdb->result);
516       mdb->result = NULL;
517    }
518
519    if (mdb->row) {
520       free(mdb->row);
521       mdb->row = NULL;
522    }
523
524    if (mdb->fields) {
525       free(mdb->fields);
526       mdb->fields = NULL;
527    }
528 }
529
530 int my_postgresql_currval(B_DB *mdb, char *table_name)
531 {
532    // Obtain the current value of the sequence that
533    // provides the serial value for primary key of the table.
534
535    // currval is local to our session.  It is not affected by
536    // other transactions.
537
538    // Determine the name of the sequence.
539    // PostgreSQL automatically creates a sequence using
540    // <table>_<column>_seq.
541    // At the time of writing, all tables used this format for
542    // for their primary key: <table>id
543    // Except for basefiles which has a primary key on baseid.
544    // Therefore, we need to special case that one table.
545
546    // everything else can use the PostgreSQL formula.
547
548    char      sequence[NAMEDATALEN-1];
549    char      query   [NAMEDATALEN+50];
550    PGresult *result;
551    int       id = 0;
552
553    if (strcasecmp(table_name, "basefiles") == 0) {
554       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
555    } else {
556       bstrncpy(sequence, table_name, sizeof(sequence));
557       bstrncat(sequence, "_",        sizeof(sequence));
558       bstrncat(sequence, table_name, sizeof(sequence));
559       bstrncat(sequence, "id",       sizeof(sequence));
560    }
561
562    bstrncat(sequence, "_seq", sizeof(sequence));
563    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
564
565    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
566    for (int i=0; i < 10; i++) {
567       result = PQexec(mdb->db, query);
568       if (result) {
569          break;
570       }
571       bmicrosleep(5, 0);
572    }
573    if (!result) {
574       Dmsg1(50, "Query failed: %s\n", query);
575       goto bail_out;
576    }
577
578    Dmsg0(500, "exec done");
579
580    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
581       Dmsg0(500, "getting value");
582       id = atoi(PQgetvalue(result, 0, 0));
583       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
584    } else {
585       Dmsg1(50, "Result status failed: %s\n", query);
586       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
587    }
588
589 bail_out:
590    PQclear(result);
591
592    return id;
593 }
594
595 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
596 {
597    char *query = "COPY batch FROM STDIN";
598
599    Dmsg0(500, "my_postgresql_batch_start started\n");
600
601    if (my_postgresql_query(mdb,
602                            " CREATE TEMPORARY TABLE batch "
603                            "        (fileindex int,       "
604                            "        jobid int,            "
605                            "        path varchar,         "
606                            "        name varchar,         "
607                            "        lstat varchar,        "
608                            "        md5 varchar)") == 1)
609    {
610       Dmsg0(500, "my_postgresql_batch_start failed\n");
611       return 1;
612    }
613    
614    // We are starting a new query.  reset everything.
615    mdb->num_rows     = -1;
616    mdb->row_number   = -1;
617    mdb->field_number = -1;
618
619    if (mdb->result != NULL) {
620       my_postgresql_free_result(mdb);
621    }
622
623    for (int i=0; i < 10; i++) {
624       mdb->result = PQexec(mdb->db, query);
625       if (mdb->result) {
626          break;
627       }
628       bmicrosleep(5, 0);
629    }
630    if (!mdb->result) {
631       Dmsg1(50, "Query failed: %s\n", query);
632       goto bail_out;
633    }
634
635    mdb->status = PQresultStatus(mdb->result);
636    if (mdb->status == PGRES_COPY_IN) {
637       // how many fields in the set?
638       mdb->num_fields = (int) PQnfields(mdb->result);
639       mdb->num_rows   = 0;
640       mdb->status = 1;
641    } else {
642       Dmsg1(50, "Result status failed: %s\n", query);
643       goto bail_out;
644    }
645
646    Dmsg0(500, "my_postgresql_batch_start finishing\n");
647
648    return mdb->status;
649
650 bail_out:
651    mdb->status = 0;
652    PQclear(mdb->result);
653    mdb->result = NULL;
654    return mdb->status;
655 }
656
657 /* set error to something to abort operation */
658 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
659 {
660    int res;
661    int count=30;
662    Dmsg0(500, "my_postgresql_batch_end started\n");
663
664    if (!mdb) {                  /* no files ? */
665       return 0;
666    }
667
668    do { 
669       res = PQputCopyEnd(mdb->db, error);
670    } while (res == 0 && --count > 0);
671
672    if (res == 1) {
673       Dmsg0(500, "ok\n");
674       mdb->status = 1;
675    }
676    
677    if (res <= 0) {
678       Dmsg0(500, "we failed\n");
679       mdb->status = 0;
680       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
681    }
682    
683    Dmsg0(500, "my_postgresql_batch_end finishing\n");
684
685    return mdb->status;
686 }
687
688 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
689 {
690    int res;
691    int count=30;
692    size_t len;
693    char *digest;
694    char ed1[50];
695
696    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
697    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
698
699    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
700    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
701
702    if (ar->Digest == NULL || ar->Digest[0] == 0) {
703       digest = "0";
704    } else {
705       digest = ar->Digest;
706    }
707
708    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
709               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
710               mdb->esc_name, ar->attr, digest);
711
712    do { 
713       res = PQputCopyData(mdb->db,
714                           mdb->cmd,
715                           len);
716    } while (res == 0 && --count > 0);
717
718    if (res == 1) {
719       Dmsg0(500, "ok\n");
720       mdb->changes++;
721       mdb->status = 1;
722    }
723
724    if (res <= 0) {
725       Dmsg0(500, "we failed\n");
726       mdb->status = 0;
727       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
728    }
729
730    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
731
732    return mdb->status;
733 }
734
735 /*
736  * Escape strings so that PostgreSQL is happy on COPY
737  *
738  *   NOTE! len is the length of the old string. Your new
739  *         string must be long enough (max 2*old+1) to hold
740  *         the escaped output.
741  */
742 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
743 {
744    /* we have to escape \t, \n, \r, \ */
745    char c = '\0' ;
746
747    while (len > 0 && *src) {
748       switch (*src) {
749       case '\n':
750          c = 'n';
751          break;
752       case '\\':
753          c = '\\';
754          break;
755       case '\t':
756          c = 't';
757          break;
758       case '\r':
759          c = 'r';
760          break;
761       default:
762          c = '\0' ;
763       }
764
765       if (c) {
766          *dest = '\\';
767          dest++;
768          *dest = c;
769       } else {
770          *dest = *src;
771       }
772
773       len--;
774       src++;
775       dest++;
776    }
777
778    *dest = '\0';
779    return dest;
780 }
781
782 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
783
784
785 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
786
787 char *my_pg_batch_unlock_tables_query = "COMMIT";
788
789 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
790                                     "  SELECT a.Path FROM                                       "
791                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
792                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
793
794
795 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
796                                         "  SELECT a.Name FROM               "
797                                         "    (SELECT DISTINCT Name FROM batch) as a "
798                                         "    WHERE NOT EXISTS               "
799                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
800 #endif /* HAVE_POSTGRESQL */