]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
ebl Use btime_t instead of uint64_t in media patch.
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Bacula® - The Network Backup Solution
12
13    Copyright (C) 2003-2006 Free Software Foundation Europe e.V.
14
15    The main author of Bacula is Kern Sibbald, with contributions from
16    many others, a complete list can be found in the file AUTHORS.
17    This program is Free Software; you can redistribute it and/or
18    modify it under the terms of version two of the GNU General Public
19    License as published by the Free Software Foundation plus additions
20    that are listed in the file LICENSE.
21
22    This program is distributed in the hope that it will be useful, but
23    WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25    General Public License for more details.
26
27    You should have received a copy of the GNU General Public License
28    along with this program; if not, write to the Free Software
29    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30    02110-1301, USA.
31
32    Bacula® is a registered trademark of John Walker.
33    The licensor of Bacula is the Free Software Foundation Europe
34    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
35    Switzerland, email:ftf@fsfeurope.org.
36 */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_name2      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146    P(mutex);
147    if (mdb->connected) {
148       V(mutex);
149       return 1;
150    }
151    mdb->connected = false;
152
153    if ((errstat=rwl_init(&mdb->lock)) != 0) {
154       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
155             strerror(errstat));
156       V(mutex);
157       return 0;
158    }
159
160    if (mdb->db_port) {
161       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
162       port = buf;
163    } else {
164       port = NULL;
165    }
166
167    /* If connection fails, try at 5 sec intervals for 30 seconds. */
168    for (int retry=0; retry < 6; retry++) {
169       /* connect to the database */
170       mdb->db = PQsetdbLogin(
171            mdb->db_address,           /* default = localhost */
172            port,                      /* default port */
173            NULL,                      /* pg options */
174            NULL,                      /* tty, ignored */
175            mdb->db_name,              /* database name */
176            mdb->db_user,              /* login name */
177            mdb->db_password);         /* password */
178
179       /* If no connect, try once more in case it is a timing problem */
180       if (PQstatus(mdb->db) == CONNECTION_OK) {
181          break;
182       }
183       bmicrosleep(5, 0);
184    }
185
186    Dmsg0(50, "pg_real_connect done\n");
187    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
188             mdb->db_password==NULL?"(NULL)":mdb->db_password);
189
190    if (PQstatus(mdb->db) != CONNECTION_OK) {
191       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
192             "Database=%s User=%s\n"
193             "It is probably not running or your password is incorrect.\n"),
194              mdb->db_name, mdb->db_user);
195       V(mutex);
196       return 0;
197    }
198
199    if (!check_tables_version(jcr, mdb)) {
200       V(mutex);
201       return 0;
202    }
203
204    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
205
206    mdb->connected = true;
207    V(mutex);
208    return 1;
209 }
210
211 void
212 db_close_database(JCR *jcr, B_DB *mdb)
213 {
214    if (!mdb) {
215       return;
216    }
217    db_end_transaction(jcr, mdb);
218    P(mutex);
219    mdb->ref_count--;
220    if (mdb->ref_count == 0) {
221       qdchain(&mdb->bq);
222       if (mdb->connected && mdb->db) {
223          sql_close(mdb);
224       }
225       rwl_destroy(&mdb->lock);
226       free_pool_memory(mdb->errmsg);
227       free_pool_memory(mdb->cmd);
228       free_pool_memory(mdb->cached_path);
229       free_pool_memory(mdb->fname);
230       free_pool_memory(mdb->path);
231       free_pool_memory(mdb->esc_name);
232       free_pool_memory(mdb->esc_name2);
233       if (mdb->db_name) {
234          free(mdb->db_name);
235       }
236       if (mdb->db_user) {
237          free(mdb->db_user);
238       }
239       if (mdb->db_password) {
240          free(mdb->db_password);
241       }
242       if (mdb->db_address) {
243          free(mdb->db_address);
244       }
245       if (mdb->db_socket) {
246          free(mdb->db_socket);
247       }
248       my_postgresql_free_result(mdb);
249       free(mdb);
250    }
251    V(mutex);
252 }
253
254 /*
255  * Return the next unique index (auto-increment) for
256  * the given table.  Return NULL on error.
257  *
258  * For PostgreSQL, NULL causes the auto-increment value
259  *  to be updated.
260  */
261 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
262 {
263    strcpy(index, "NULL");
264    return 1;
265 }
266
267
268 /*
269  * Escape strings so that PostgreSQL is happy
270  *
271  *   NOTE! len is the length of the old string. Your new
272  *         string must be long enough (max 2*old+1) to hold
273  *         the escaped output.
274  */
275 void
276 db_escape_string(char *snew, char *old, int len)
277 {
278    PQescapeString(snew, old, len);
279 }
280
281 /*
282  * Submit a general SQL command (cmd), and for each row returned,
283  *  the sqlite_handler is called with the ctx.
284  */
285 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
286 {
287    SQL_ROW row;
288
289    Dmsg0(500, "db_sql_query started\n");
290
291    db_lock(mdb);
292    if (sql_query(mdb, query) != 0) {
293       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
294       db_unlock(mdb);
295       Dmsg0(500, "db_sql_query failed\n");
296       return 0;
297    }
298    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
299
300    if (result_handler != NULL) {
301       Dmsg0(500, "db_sql_query invoking handler\n");
302       if ((mdb->result = sql_store_result(mdb)) != NULL) {
303          int num_fields = sql_num_fields(mdb);
304
305          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
306          while ((row = sql_fetch_row(mdb)) != NULL) {
307
308             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
309             if (result_handler(ctx, num_fields, row))
310                break;
311          }
312
313         sql_free_result(mdb);
314       }
315    }
316    db_unlock(mdb);
317
318    Dmsg0(500, "db_sql_query finished\n");
319
320    return 1;
321 }
322
323
324
325 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
326 {
327    int j;
328    POSTGRESQL_ROW row = NULL; // by default, return NULL
329
330    Dmsg0(500, "my_postgresql_fetch_row start\n");
331
332    if (mdb->row_number == -1 || mdb->row == NULL) {
333       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
334
335       if (mdb->row != NULL) {
336          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
337          free(mdb->row);
338          mdb->row = NULL;
339       }
340
341       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
342
343       // now reset the row_number now that we have the space allocated
344       mdb->row_number = 0;
345    }
346
347    // if still within the result set
348    if (mdb->row_number < mdb->num_rows) {
349       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
350       // get each value from this row
351       for (j = 0; j < mdb->num_fields; j++) {
352          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
353          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
354       }
355       // increment the row number for the next call
356       mdb->row_number++;
357
358       row = mdb->row;
359    } else {
360       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
361    }
362
363    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
364
365    return row;
366 }
367
368 int my_postgresql_max_length(B_DB *mdb, int field_num) {
369    //
370    // for a given column, find the max length
371    //
372    int max_length;
373    int i;
374    int this_length;
375
376    max_length = 0;
377    for (i = 0; i < mdb->num_rows; i++) {
378       if (PQgetisnull(mdb->result, i, field_num)) {
379           this_length = 4;        // "NULL"
380       } else {
381           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
382       }
383
384       if (max_length < this_length) {
385           max_length = this_length;
386       }
387    }
388
389    return max_length;
390 }
391
392 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
393 {
394    int     i;
395
396    Dmsg0(500, "my_postgresql_fetch_field starts\n");
397    if (mdb->fields == NULL) {
398       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
399       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
400
401       for (i = 0; i < mdb->num_fields; i++) {
402          Dmsg1(500, "filling field %d\n", i);
403          mdb->fields[i].name           = PQfname(mdb->result, i);
404          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
405          mdb->fields[i].type       = PQftype(mdb->result, i);
406          mdb->fields[i].flags      = 0;
407
408          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
409             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
410             mdb->fields[i].flags);
411       } // end for
412    } // end if
413
414    // increment field number for the next time around
415
416    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
417    return &mdb->fields[mdb->field_number++];
418 }
419
420 void my_postgresql_data_seek(B_DB *mdb, int row)
421 {
422    // set the row number to be returned on the next call
423    // to my_postgresql_fetch_row
424    mdb->row_number = row;
425 }
426
427 void my_postgresql_field_seek(B_DB *mdb, int field)
428 {
429    mdb->field_number = field;
430 }
431
432 /*
433  * Note, if this routine returns 1 (failure), Bacula expects
434  *  that no result has been stored.
435  */
436 int my_postgresql_query(B_DB *mdb, const char *query) {
437    Dmsg0(500, "my_postgresql_query started\n");
438    // We are starting a new query.  reset everything.
439    mdb->num_rows     = -1;
440    mdb->row_number   = -1;
441    mdb->field_number = -1;
442
443    if (mdb->result != NULL) {
444       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
445    }
446
447    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
448    mdb->result = PQexec(mdb->db, query);
449    mdb->status = PQresultStatus(mdb->result);
450    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
451       Dmsg1(500, "we have a result\n", query);
452
453       // how many fields in the set?
454       mdb->num_fields = (int) PQnfields(mdb->result);
455       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
456
457       mdb->num_rows   = PQntuples(mdb->result);
458       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
459
460       mdb->status = 0;
461    } else {
462       Dmsg1(500, "we failed\n", query);
463       mdb->status = 1;
464    }
465
466    Dmsg0(500, "my_postgresql_query finishing\n");
467
468    return mdb->status;
469 }
470
471 void my_postgresql_free_result (B_DB *mdb)
472 {
473    if (mdb->result) {
474       PQclear(mdb->result);
475       mdb->result = NULL;
476    }
477
478    if (mdb->row) {
479       free(mdb->row);
480       mdb->row = NULL;
481    }
482
483    if (mdb->fields) {
484       free(mdb->fields);
485       mdb->fields = NULL;
486    }
487 }
488
489 int my_postgresql_currval(B_DB *mdb, char *table_name)
490 {
491    // Obtain the current value of the sequence that
492    // provides the serial value for primary key of the table.
493
494    // currval is local to our session.  It is not affected by
495    // other transactions.
496
497    // Determine the name of the sequence.
498    // PostgreSQL automatically creates a sequence using
499    // <table>_<column>_seq.
500    // At the time of writing, all tables used this format for
501    // for their primary key: <table>id
502    // Except for basefiles which has a primary key on baseid.
503    // Therefore, we need to special case that one table.
504
505    // everything else can use the PostgreSQL formula.
506
507    char      sequence[NAMEDATALEN-1];
508    char      query   [NAMEDATALEN+50];
509    PGresult *result;
510    int       id = 0;
511
512    if (strcasecmp(table_name, "basefiles") == 0) {
513       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
514    } else {
515       bstrncpy(sequence, table_name, sizeof(sequence));
516       bstrncat(sequence, "_",        sizeof(sequence));
517       bstrncat(sequence, table_name, sizeof(sequence));
518       bstrncat(sequence, "id",       sizeof(sequence));
519    }
520
521    bstrncat(sequence, "_seq", sizeof(sequence));
522    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
523
524 // Mmsg(query, "SELECT currval('%s')", sequence);
525    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
526    result = PQexec(mdb->db, query);
527
528    Dmsg0(500, "exec done");
529
530    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
531       Dmsg0(500, "getting value");
532       id = atoi(PQgetvalue(result, 0, 0));
533       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
534    } else {
535       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
536    }
537
538    PQclear(result);
539
540    return id;
541 }
542
543 int my_postgresql_lock_table(B_DB *mdb, const char *table)
544 {
545    my_postgresql_query(mdb, "BEGIN");
546    Mmsg(mdb->cmd, "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", table);
547    return my_postgresql_query(mdb, mdb->cmd);
548 }
549
550 int my_postgresql_unlock_table(B_DB *mdb)
551 {
552    return my_postgresql_query(mdb, "COMMIT");
553 }
554
555 int my_postgresql_batch_start(B_DB *mdb)
556 {
557    Dmsg0(500, "my_postgresql_batch_start started\n");
558
559    if (my_postgresql_query(mdb,
560                            " CREATE TEMPORARY TABLE batch "
561                            "        (fileindex int,       "
562                            "        jobid int,            "
563                            "        path varchar,         "
564                            "        name varchar,         "
565                            "        lstat varchar,        "
566                            "        md5 varchar)") == 1)
567    {
568       Dmsg0(500, "my_postgresql_batch_start failed\n");
569       return 1;
570    }
571    
572    // We are starting a new query.  reset everything.
573    mdb->num_rows     = -1;
574    mdb->row_number   = -1;
575    mdb->field_number = -1;
576
577    if (mdb->result != NULL) {
578       my_postgresql_free_result(mdb);
579    }
580
581    mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN");
582    mdb->status = PQresultStatus(mdb->result);
583    if (mdb->status == PGRES_COPY_IN) {
584       // how many fields in the set?
585       mdb->num_fields = (int) PQnfields(mdb->result);
586       mdb->num_rows   = 0;
587       mdb->status = 0;
588    } else {
589       Dmsg0(500, "we failed\n");
590       mdb->status = 1;
591    }
592
593    Dmsg0(500, "my_postgresql_batch_start finishing\n");
594
595    return mdb->status;
596 }
597
598 /* set error to something to abort operation */
599 int my_postgresql_batch_end(B_DB *mdb, const char *error)
600 {
601    int res;
602    int count=30;
603    Dmsg0(500, "my_postgresql_batch_end started\n");
604
605    if (!mdb) {                  /* no files ? */
606       return 0;
607    }
608
609    do { 
610       res = PQputCopyEnd(mdb->db, error);
611    } while (res == 0 && --count > 0);
612
613    if (res == 1) {
614       Dmsg0(500, "ok\n");
615       mdb->status = 0;
616    }
617    
618    if (res <= 0) {
619       Dmsg0(500, "we failed\n");
620       mdb->status = 1;
621       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
622    }
623    
624    Dmsg0(500, "my_postgresql_batch_end finishing\n");
625
626    return mdb->status;
627 }
628
629 int my_postgresql_batch_insert(B_DB *mdb, ATTR_DBR *ar)
630 {
631    int res;
632    int count=30;
633    size_t len;
634    char *digest;
635    char ed1[50];
636
637    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
638    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
639
640    mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
641    my_postgresql_copy_escape(mdb->esc_name2, mdb->path, mdb->pnl);
642
643    if (ar->Digest == NULL || ar->Digest[0] == 0) {
644       digest = "0";
645    } else {
646       digest = ar->Digest;
647    }
648
649    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
650               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->path, 
651               mdb->fname, ar->attr, digest);
652
653    do { 
654       res = PQputCopyData(mdb->db,
655                           mdb->cmd,
656                           len);
657    } while (res == 0 && --count > 0);
658
659    if (res == 1) {
660       Dmsg0(500, "ok\n");
661       mdb->changes++;
662       mdb->status = 0;
663    }
664
665    if (res <= 0) {
666       Dmsg0(500, "we failed\n");
667       mdb->status = 1;
668       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
669    }
670
671    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
672
673    return mdb->status;
674 }
675
676 /*
677  * Escape strings so that PostgreSQL is happy on COPY
678  *
679  *   NOTE! len is the length of the old string. Your new
680  *         string must be long enough (max 2*old+1) to hold
681  *         the escaped output.
682  */
683 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
684 {
685    /* we have to escape \t, \n, \r, \ */
686    char c = '\0' ;
687
688    while (len > 0 && *src) {
689       switch (*src) {
690       case '\n':
691          c = 'n';
692          break;
693       case '\\':
694          c = '\\';
695          break;
696       case '\t':
697          c = 't';
698          break;
699       case '\r':
700          c = 'r';
701          break;
702       default:
703          c = '\0' ;
704       }
705
706       if (c) {
707          *dest = '\\';
708          dest++;
709          *dest = c;
710       } else {
711          *dest = *src;
712       }
713
714       len--;
715       src++;
716       dest++;
717    }
718
719    *dest = '\0';
720    return dest;
721 }
722
723 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
724
725
726 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
727
728 char *my_pg_batch_unlock_tables_query = "COMMIT";
729
730 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
731                                     "  SELECT a.Path FROM                                       "
732                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
733                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
734
735
736 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
737                                         "  SELECT a.Name FROM               "
738                                         "    (SELECT DISTINCT Name FROM batch) as a "
739                                         "    WHERE NOT EXISTS               "
740                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
741 #endif /* HAVE_POSTGRESQL */