]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
Remove inappropriate my_postgresql_free_result() in db_close()
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146 #ifdef xxx
147    if (!PQisthreadsafe()) {
148       Jmsg(jcr, M_ABORT, 0, _("PostgreSQL configuration problem. "          
149            "PostgreSQL library is not thread safe. Connot continue.\n"));
150    }
151 #endif
152    P(mutex);
153    if (mdb->connected) {
154       V(mutex);
155       return 1;
156    }
157    mdb->connected = false;
158
159    if ((errstat=rwl_init(&mdb->lock)) != 0) {
160       berrno be;
161       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
162             be.bstrerror(errstat));
163       V(mutex);
164       return 0;
165    }
166
167    if (mdb->db_port) {
168       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
169       port = buf;
170    } else {
171       port = NULL;
172    }
173
174    /* If connection fails, try at 5 sec intervals for 30 seconds. */
175    for (int retry=0; retry < 6; retry++) {
176       /* connect to the database */
177       mdb->db = PQsetdbLogin(
178            mdb->db_address,           /* default = localhost */
179            port,                      /* default port */
180            NULL,                      /* pg options */
181            NULL,                      /* tty, ignored */
182            mdb->db_name,              /* database name */
183            mdb->db_user,              /* login name */
184            mdb->db_password);         /* password */
185
186       /* If no connect, try once more in case it is a timing problem */
187       if (PQstatus(mdb->db) == CONNECTION_OK) {
188          break;
189       }
190       bmicrosleep(5, 0);
191    }
192
193    Dmsg0(50, "pg_real_connect done\n");
194    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
195             mdb->db_password==NULL?"(NULL)":mdb->db_password);
196
197    if (PQstatus(mdb->db) != CONNECTION_OK) {
198       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
199             "Database=%s User=%s\n"
200             "It is probably not running or your password is incorrect.\n"),
201              mdb->db_name, mdb->db_user);
202       V(mutex);
203       return 0;
204    }
205
206    mdb->connected = true;
207
208    if (!check_tables_version(jcr, mdb)) {
209       V(mutex);
210       return 0;
211    }
212
213    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
214
215    V(mutex);
216    return 1;
217 }
218
219 void
220 db_close_database(JCR *jcr, B_DB *mdb)
221 {
222    if (!mdb) {
223       return;
224    }
225    db_end_transaction(jcr, mdb);
226    P(mutex);
227    sql_free_result(mdb);
228    mdb->ref_count--;
229    if (mdb->ref_count == 0) {
230       qdchain(&mdb->bq);
231       if (mdb->connected && mdb->db) {
232          sql_close(mdb);
233       }
234       rwl_destroy(&mdb->lock);
235       free_pool_memory(mdb->errmsg);
236       free_pool_memory(mdb->cmd);
237       free_pool_memory(mdb->cached_path);
238       free_pool_memory(mdb->fname);
239       free_pool_memory(mdb->path);
240       free_pool_memory(mdb->esc_name);
241       free_pool_memory(mdb->esc_path);
242       if (mdb->db_name) {
243          free(mdb->db_name);
244       }
245       if (mdb->db_user) {
246          free(mdb->db_user);
247       }
248       if (mdb->db_password) {
249          free(mdb->db_password);
250       }
251       if (mdb->db_address) {
252          free(mdb->db_address);
253       }
254       if (mdb->db_socket) {
255          free(mdb->db_socket);
256       }
257       free(mdb);
258    }
259    V(mutex);
260 }
261
262 void db_thread_cleanup()
263 { }
264
265 /*
266  * Return the next unique index (auto-increment) for
267  * the given table.  Return NULL on error.
268  *
269  * For PostgreSQL, NULL causes the auto-increment value
270  *  to be updated.
271  */
272 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
273 {
274    strcpy(index, "NULL");
275    return 1;
276 }
277
278
279 /*
280  * Escape strings so that PostgreSQL is happy
281  *
282  *   NOTE! len is the length of the old string. Your new
283  *         string must be long enough (max 2*old+1) to hold
284  *         the escaped output.
285  */
286 void
287 db_escape_string(char *snew, char *old, int len)
288 {
289    PQescapeString(snew, old, len);
290 }
291
292 /*
293  * Submit a general SQL command (cmd), and for each row returned,
294  *  the sqlite_handler is called with the ctx.
295  */
296 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
297 {
298    SQL_ROW row;
299
300    Dmsg0(500, "db_sql_query started\n");
301
302    db_lock(mdb);
303    if (sql_query(mdb, query) != 0) {
304       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
305       db_unlock(mdb);
306       Dmsg0(500, "db_sql_query failed\n");
307       return 0;
308    }
309    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
310
311    if (result_handler != NULL) {
312       Dmsg0(500, "db_sql_query invoking handler\n");
313       if ((mdb->result = sql_store_result(mdb)) != NULL) {
314          int num_fields = sql_num_fields(mdb);
315
316          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
317          while ((row = sql_fetch_row(mdb)) != NULL) {
318
319             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
320             if (result_handler(ctx, num_fields, row))
321                break;
322          }
323
324         sql_free_result(mdb);
325       }
326    }
327    db_unlock(mdb);
328
329    Dmsg0(500, "db_sql_query finished\n");
330
331    return 1;
332 }
333
334
335
336 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
337 {
338    int j;
339    POSTGRESQL_ROW row = NULL; // by default, return NULL
340
341    Dmsg0(500, "my_postgresql_fetch_row start\n");
342
343    if (!mdb->row || mdb->row_size < mdb->num_fields) {
344       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
345
346       if (mdb->row) {
347          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
348          free(mdb->row);
349       }
350       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
351       mdb->row_size = mdb->num_fields;
352
353       // now reset the row_number now that we have the space allocated
354       mdb->row_number = 0;
355    }
356
357    // if still within the result set
358    if (mdb->row_number < mdb->num_rows) {
359       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
360       // get each value from this row
361       for (j = 0; j < mdb->num_fields; j++) {
362          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
363          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
364       }
365       // increment the row number for the next call
366       mdb->row_number++;
367
368       row = mdb->row;
369    } else {
370       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
371    }
372
373    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
374
375    return row;
376 }
377
378 int my_postgresql_max_length(B_DB *mdb, int field_num) {
379    //
380    // for a given column, find the max length
381    //
382    int max_length;
383    int i;
384    int this_length;
385
386    max_length = 0;
387    for (i = 0; i < mdb->num_rows; i++) {
388       if (PQgetisnull(mdb->result, i, field_num)) {
389           this_length = 4;        // "NULL"
390       } else {
391           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
392       }
393
394       if (max_length < this_length) {
395           max_length = this_length;
396       }
397    }
398
399    return max_length;
400 }
401
402 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
403 {
404    int     i;
405
406    Dmsg0(500, "my_postgresql_fetch_field starts\n");
407
408    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
409       if (mdb->fields) {
410          free(mdb->fields);
411       }
412       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
413       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
414       mdb->fields_size = mdb->num_fields;
415
416       for (i = 0; i < mdb->num_fields; i++) {
417          Dmsg1(500, "filling field %d\n", i);
418          mdb->fields[i].name           = PQfname(mdb->result, i);
419          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
420          mdb->fields[i].type       = PQftype(mdb->result, i);
421          mdb->fields[i].flags      = 0;
422
423          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
424             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
425             mdb->fields[i].flags);
426       } // end for
427    } // end if
428
429    // increment field number for the next time around
430
431    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
432    return &mdb->fields[mdb->field_number++];
433 }
434
435 void my_postgresql_data_seek(B_DB *mdb, int row)
436 {
437    // set the row number to be returned on the next call
438    // to my_postgresql_fetch_row
439    mdb->row_number = row;
440 }
441
442 void my_postgresql_field_seek(B_DB *mdb, int field)
443 {
444    mdb->field_number = field;
445 }
446
447 /*
448  * Note, if this routine returns 1 (failure), Bacula expects
449  *  that no result has been stored.
450  * This is where QUERY_DB comes with Postgresql.
451  *
452  *  Returns:  0  on success
453  *            1  on failure
454  *
455  */
456 int my_postgresql_query(B_DB *mdb, const char *query)
457 {
458    Dmsg0(500, "my_postgresql_query started\n");
459    // We are starting a new query.  reset everything.
460    mdb->num_rows     = -1;
461    mdb->row_number   = -1;
462    mdb->field_number = -1;
463
464    if (mdb->result) {
465       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
466       mdb->result = NULL;
467    }
468
469    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
470
471    for (int i=0; i < 10; i++) {
472       mdb->result = PQexec(mdb->db, query);
473       if (mdb->result) {
474          break;
475       }
476       bmicrosleep(5, 0);
477    }
478    if (!mdb->result) {
479       Dmsg1(50, "Query failed: %s\n", query);
480       goto bail_out;
481    }
482
483    mdb->status = PQresultStatus(mdb->result);
484    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
485       Dmsg1(500, "we have a result\n", query);
486
487       // how many fields in the set?
488       mdb->num_fields = (int)PQnfields(mdb->result);
489       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
490
491       mdb->num_rows = PQntuples(mdb->result);
492       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
493
494       mdb->status = 0;                  /* succeed */
495    } else {
496       Dmsg1(50, "Result status failed: %s\n", query);
497       goto bail_out;
498    }
499
500    Dmsg0(500, "my_postgresql_query finishing\n");
501    return mdb->status;
502
503 bail_out:
504    Dmsg1(500, "we failed\n", query);
505    PQclear(mdb->result);
506    mdb->result = NULL;
507    mdb->status = 1;                   /* failed */
508    return mdb->status;
509 }
510
511 void my_postgresql_free_result(B_DB *mdb)
512 {
513    
514    db_lock(mdb);
515    if (mdb->result) {
516       PQclear(mdb->result);
517       mdb->result = NULL;
518    }
519
520    if (mdb->row) {
521       free(mdb->row);
522       mdb->row = NULL;
523    }
524
525    if (mdb->fields) {
526       free(mdb->fields);
527       mdb->fields = NULL;
528    }
529    db_unlock(mdb);
530 }
531
532 int my_postgresql_currval(B_DB *mdb, char *table_name)
533 {
534    // Obtain the current value of the sequence that
535    // provides the serial value for primary key of the table.
536
537    // currval is local to our session.  It is not affected by
538    // other transactions.
539
540    // Determine the name of the sequence.
541    // PostgreSQL automatically creates a sequence using
542    // <table>_<column>_seq.
543    // At the time of writing, all tables used this format for
544    // for their primary key: <table>id
545    // Except for basefiles which has a primary key on baseid.
546    // Therefore, we need to special case that one table.
547
548    // everything else can use the PostgreSQL formula.
549
550    char      sequence[NAMEDATALEN-1];
551    char      query   [NAMEDATALEN+50];
552    PGresult *result;
553    int       id = 0;
554
555    if (strcasecmp(table_name, "basefiles") == 0) {
556       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
557    } else {
558       bstrncpy(sequence, table_name, sizeof(sequence));
559       bstrncat(sequence, "_",        sizeof(sequence));
560       bstrncat(sequence, table_name, sizeof(sequence));
561       bstrncat(sequence, "id",       sizeof(sequence));
562    }
563
564    bstrncat(sequence, "_seq", sizeof(sequence));
565    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
566
567    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
568    for (int i=0; i < 10; i++) {
569       result = PQexec(mdb->db, query);
570       if (result) {
571          break;
572       }
573       bmicrosleep(5, 0);
574    }
575    if (!result) {
576       Dmsg1(50, "Query failed: %s\n", query);
577       goto bail_out;
578    }
579
580    Dmsg0(500, "exec done");
581
582    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
583       Dmsg0(500, "getting value");
584       id = atoi(PQgetvalue(result, 0, 0));
585       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
586    } else {
587       Dmsg1(50, "Result status failed: %s\n", query);
588       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
589    }
590
591 bail_out:
592    PQclear(result);
593
594    return id;
595 }
596
597 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
598 {
599    char *query = "COPY batch FROM STDIN";
600
601    Dmsg0(500, "my_postgresql_batch_start started\n");
602
603    if (my_postgresql_query(mdb,
604                            " CREATE TEMPORARY TABLE batch "
605                            "        (fileindex int,       "
606                            "        jobid int,            "
607                            "        path varchar,         "
608                            "        name varchar,         "
609                            "        lstat varchar,        "
610                            "        md5 varchar)") == 1)
611    {
612       Dmsg0(500, "my_postgresql_batch_start failed\n");
613       return 1;
614    }
615    
616    // We are starting a new query.  reset everything.
617    mdb->num_rows     = -1;
618    mdb->row_number   = -1;
619    mdb->field_number = -1;
620
621    my_postgresql_free_result(mdb);
622
623    for (int i=0; i < 10; i++) {
624       mdb->result = PQexec(mdb->db, query);
625       if (mdb->result) {
626          break;
627       }
628       bmicrosleep(5, 0);
629    }
630    if (!mdb->result) {
631       Dmsg1(50, "Query failed: %s\n", query);
632       goto bail_out;
633    }
634
635    mdb->status = PQresultStatus(mdb->result);
636    if (mdb->status == PGRES_COPY_IN) {
637       // how many fields in the set?
638       mdb->num_fields = (int) PQnfields(mdb->result);
639       mdb->num_rows   = 0;
640       mdb->status = 1;
641    } else {
642       Dmsg1(50, "Result status failed: %s\n", query);
643       goto bail_out;
644    }
645
646    Dmsg0(500, "my_postgresql_batch_start finishing\n");
647
648    return mdb->status;
649
650 bail_out:
651    mdb->status = 0;
652    PQclear(mdb->result);
653    mdb->result = NULL;
654    return mdb->status;
655 }
656
657 /* set error to something to abort operation */
658 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
659 {
660    int res;
661    int count=30;
662    Dmsg0(500, "my_postgresql_batch_end started\n");
663
664    if (!mdb) {                  /* no files ? */
665       return 0;
666    }
667
668    do { 
669       res = PQputCopyEnd(mdb->db, error);
670    } while (res == 0 && --count > 0);
671
672    if (res == 1) {
673       Dmsg0(500, "ok\n");
674       mdb->status = 1;
675    }
676    
677    if (res <= 0) {
678       Dmsg0(500, "we failed\n");
679       mdb->status = 0;
680       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
681    }
682    
683    Dmsg0(500, "my_postgresql_batch_end finishing\n");
684
685    return mdb->status;
686 }
687
688 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
689 {
690    int res;
691    int count=30;
692    size_t len;
693    char *digest;
694    char ed1[50];
695
696    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
697    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
698
699    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
700    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
701
702    if (ar->Digest == NULL || ar->Digest[0] == 0) {
703       digest = "0";
704    } else {
705       digest = ar->Digest;
706    }
707
708    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
709               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
710               mdb->esc_name, ar->attr, digest);
711
712    do { 
713       res = PQputCopyData(mdb->db,
714                           mdb->cmd,
715                           len);
716    } while (res == 0 && --count > 0);
717
718    if (res == 1) {
719       Dmsg0(500, "ok\n");
720       mdb->changes++;
721       mdb->status = 1;
722    }
723
724    if (res <= 0) {
725       Dmsg0(500, "we failed\n");
726       mdb->status = 0;
727       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
728    }
729
730    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
731
732    return mdb->status;
733 }
734
735 /*
736  * Escape strings so that PostgreSQL is happy on COPY
737  *
738  *   NOTE! len is the length of the old string. Your new
739  *         string must be long enough (max 2*old+1) to hold
740  *         the escaped output.
741  */
742 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
743 {
744    /* we have to escape \t, \n, \r, \ */
745    char c = '\0' ;
746
747    while (len > 0 && *src) {
748       switch (*src) {
749       case '\n':
750          c = 'n';
751          break;
752       case '\\':
753          c = '\\';
754          break;
755       case '\t':
756          c = 't';
757          break;
758       case '\r':
759          c = 'r';
760          break;
761       default:
762          c = '\0' ;
763       }
764
765       if (c) {
766          *dest = '\\';
767          dest++;
768          *dest = c;
769       } else {
770          *dest = *src;
771       }
772
773       len--;
774       src++;
775       dest++;
776    }
777
778    *dest = '\0';
779    return dest;
780 }
781
782 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
783
784
785 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
786
787 char *my_pg_batch_unlock_tables_query = "COMMIT";
788
789 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
790                                     "  SELECT a.Path FROM                                       "
791                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
792                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
793
794
795 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
796                                         "  SELECT a.Name FROM               "
797                                         "    (SELECT DISTINCT Name FROM batch) as a "
798                                         "    WHERE NOT EXISTS               "
799                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
800 #endif /* HAVE_POSTGRESQL */