]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
ebl fix batch escape string bug
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Bacula® - The Network Backup Solution
12
13    Copyright (C) 2003-2006 Free Software Foundation Europe e.V.
14
15    The main author of Bacula is Kern Sibbald, with contributions from
16    many others, a complete list can be found in the file AUTHORS.
17    This program is Free Software; you can redistribute it and/or
18    modify it under the terms of version two of the GNU General Public
19    License as published by the Free Software Foundation plus additions
20    that are listed in the file LICENSE.
21
22    This program is distributed in the hope that it will be useful, but
23    WITHOUT ANY WARRANTY; without even the implied warranty of
24    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
25    General Public License for more details.
26
27    You should have received a copy of the GNU General Public License
28    along with this program; if not, write to the Free Software
29    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
30    02110-1301, USA.
31
32    Bacula® is a registered trademark of John Walker.
33    The licensor of Bacula is the Free Software Foundation Europe
34    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
35    Switzerland, email:ftf@fsfeurope.org.
36 */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_name2      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146    P(mutex);
147    if (mdb->connected) {
148       V(mutex);
149       return 1;
150    }
151    mdb->connected = false;
152
153    if ((errstat=rwl_init(&mdb->lock)) != 0) {
154       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
155             strerror(errstat));
156       V(mutex);
157       return 0;
158    }
159
160    if (mdb->db_port) {
161       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
162       port = buf;
163    } else {
164       port = NULL;
165    }
166
167    /* If connection fails, try at 5 sec intervals for 30 seconds. */
168    for (int retry=0; retry < 6; retry++) {
169       /* connect to the database */
170       mdb->db = PQsetdbLogin(
171            mdb->db_address,           /* default = localhost */
172            port,                      /* default port */
173            NULL,                      /* pg options */
174            NULL,                      /* tty, ignored */
175            mdb->db_name,              /* database name */
176            mdb->db_user,              /* login name */
177            mdb->db_password);         /* password */
178
179       /* If no connect, try once more in case it is a timing problem */
180       if (PQstatus(mdb->db) == CONNECTION_OK) {
181          break;
182       }
183       bmicrosleep(5, 0);
184    }
185
186    Dmsg0(50, "pg_real_connect done\n");
187    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
188             mdb->db_password==NULL?"(NULL)":mdb->db_password);
189
190    if (PQstatus(mdb->db) != CONNECTION_OK) {
191       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
192             "Database=%s User=%s\n"
193             "It is probably not running or your password is incorrect.\n"),
194              mdb->db_name, mdb->db_user);
195       V(mutex);
196       return 0;
197    }
198
199    if (!check_tables_version(jcr, mdb)) {
200       V(mutex);
201       return 0;
202    }
203
204    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
205
206    mdb->connected = true;
207    V(mutex);
208    return 1;
209 }
210
211 void
212 db_close_database(JCR *jcr, B_DB *mdb)
213 {
214    if (!mdb) {
215       return;
216    }
217    db_end_transaction(jcr, mdb);
218    P(mutex);
219    mdb->ref_count--;
220    if (mdb->ref_count == 0) {
221       qdchain(&mdb->bq);
222       if (mdb->connected && mdb->db) {
223          sql_close(mdb);
224       }
225       rwl_destroy(&mdb->lock);
226       free_pool_memory(mdb->errmsg);
227       free_pool_memory(mdb->cmd);
228       free_pool_memory(mdb->cached_path);
229       free_pool_memory(mdb->fname);
230       free_pool_memory(mdb->path);
231       free_pool_memory(mdb->esc_name);
232       free_pool_memory(mdb->esc_name2);
233       if (mdb->db_name) {
234          free(mdb->db_name);
235       }
236       if (mdb->db_user) {
237          free(mdb->db_user);
238       }
239       if (mdb->db_password) {
240          free(mdb->db_password);
241       }
242       if (mdb->db_address) {
243          free(mdb->db_address);
244       }
245       if (mdb->db_socket) {
246          free(mdb->db_socket);
247       }
248       my_postgresql_free_result(mdb);
249       free(mdb);
250    }
251    V(mutex);
252 }
253
254 /*
255  * Return the next unique index (auto-increment) for
256  * the given table.  Return NULL on error.
257  *
258  * For PostgreSQL, NULL causes the auto-increment value
259  *  to be updated.
260  */
261 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
262 {
263    strcpy(index, "NULL");
264    return 1;
265 }
266
267
268 /*
269  * Escape strings so that PostgreSQL is happy
270  *
271  *   NOTE! len is the length of the old string. Your new
272  *         string must be long enough (max 2*old+1) to hold
273  *         the escaped output.
274  */
275 void
276 db_escape_string(char *snew, char *old, int len)
277 {
278    PQescapeString(snew, old, len);
279 }
280
281 /*
282  * Submit a general SQL command (cmd), and for each row returned,
283  *  the sqlite_handler is called with the ctx.
284  */
285 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
286 {
287    SQL_ROW row;
288
289    Dmsg0(500, "db_sql_query started\n");
290
291    db_lock(mdb);
292    if (sql_query(mdb, query) != 0) {
293       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
294       db_unlock(mdb);
295       Dmsg0(500, "db_sql_query failed\n");
296       return 0;
297    }
298    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
299
300    if (result_handler != NULL) {
301       Dmsg0(500, "db_sql_query invoking handler\n");
302       if ((mdb->result = sql_store_result(mdb)) != NULL) {
303          int num_fields = sql_num_fields(mdb);
304
305          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
306          while ((row = sql_fetch_row(mdb)) != NULL) {
307
308             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
309             if (result_handler(ctx, num_fields, row))
310                break;
311          }
312
313         sql_free_result(mdb);
314       }
315    }
316    db_unlock(mdb);
317
318    Dmsg0(500, "db_sql_query finished\n");
319
320    return 1;
321 }
322
323
324
325 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
326 {
327    int j;
328    POSTGRESQL_ROW row = NULL; // by default, return NULL
329
330    Dmsg0(500, "my_postgresql_fetch_row start\n");
331
332    if (mdb->row_number == -1 || mdb->row == NULL) {
333       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
334
335       if (mdb->row != NULL) {
336          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
337          free(mdb->row);
338          mdb->row = NULL;
339       }
340
341       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
342
343       // now reset the row_number now that we have the space allocated
344       mdb->row_number = 0;
345    }
346
347    // if still within the result set
348    if (mdb->row_number < mdb->num_rows) {
349       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
350       // get each value from this row
351       for (j = 0; j < mdb->num_fields; j++) {
352          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
353          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
354       }
355       // increment the row number for the next call
356       mdb->row_number++;
357
358       row = mdb->row;
359    } else {
360       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
361    }
362
363    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
364
365    return row;
366 }
367
368 int my_postgresql_max_length(B_DB *mdb, int field_num) {
369    //
370    // for a given column, find the max length
371    //
372    int max_length;
373    int i;
374    int this_length;
375
376    max_length = 0;
377    for (i = 0; i < mdb->num_rows; i++) {
378       if (PQgetisnull(mdb->result, i, field_num)) {
379           this_length = 4;        // "NULL"
380       } else {
381           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
382       }
383
384       if (max_length < this_length) {
385           max_length = this_length;
386       }
387    }
388
389    return max_length;
390 }
391
392 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
393 {
394    int     i;
395
396    Dmsg0(500, "my_postgresql_fetch_field starts\n");
397    if (mdb->fields == NULL) {
398       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
399       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
400
401       for (i = 0; i < mdb->num_fields; i++) {
402          Dmsg1(500, "filling field %d\n", i);
403          mdb->fields[i].name           = PQfname(mdb->result, i);
404          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
405          mdb->fields[i].type       = PQftype(mdb->result, i);
406          mdb->fields[i].flags      = 0;
407
408          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
409             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
410             mdb->fields[i].flags);
411       } // end for
412    } // end if
413
414    // increment field number for the next time around
415
416    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
417    return &mdb->fields[mdb->field_number++];
418 }
419
420 void my_postgresql_data_seek(B_DB *mdb, int row)
421 {
422    // set the row number to be returned on the next call
423    // to my_postgresql_fetch_row
424    mdb->row_number = row;
425 }
426
427 void my_postgresql_field_seek(B_DB *mdb, int field)
428 {
429    mdb->field_number = field;
430 }
431
432 /*
433  * Note, if this routine returns 1 (failure), Bacula expects
434  *  that no result has been stored.
435  */
436 int my_postgresql_query(B_DB *mdb, const char *query) {
437    Dmsg0(500, "my_postgresql_query started\n");
438    // We are starting a new query.  reset everything.
439    mdb->num_rows     = -1;
440    mdb->row_number   = -1;
441    mdb->field_number = -1;
442
443    if (mdb->result != NULL) {
444       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
445    }
446
447    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
448    mdb->result = PQexec(mdb->db, query);
449    mdb->status = PQresultStatus(mdb->result);
450    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
451       Dmsg1(500, "we have a result\n", query);
452
453       // how many fields in the set?
454       mdb->num_fields = (int) PQnfields(mdb->result);
455       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
456
457       mdb->num_rows   = PQntuples(mdb->result);
458       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
459
460       mdb->status = 0;
461    } else {
462       Dmsg1(500, "we failed\n", query);
463       mdb->status = 1;
464    }
465
466    Dmsg0(500, "my_postgresql_query finishing\n");
467
468    return mdb->status;
469 }
470
471 void my_postgresql_free_result (B_DB *mdb)
472 {
473    if (mdb->result) {
474       PQclear(mdb->result);
475       mdb->result = NULL;
476    }
477
478    if (mdb->row) {
479       free(mdb->row);
480       mdb->row = NULL;
481    }
482
483    if (mdb->fields) {
484       free(mdb->fields);
485       mdb->fields = NULL;
486    }
487 }
488
489 int my_postgresql_currval(B_DB *mdb, char *table_name)
490 {
491    // Obtain the current value of the sequence that
492    // provides the serial value for primary key of the table.
493
494    // currval is local to our session.  It is not affected by
495    // other transactions.
496
497    // Determine the name of the sequence.
498    // PostgreSQL automatically creates a sequence using
499    // <table>_<column>_seq.
500    // At the time of writing, all tables used this format for
501    // for their primary key: <table>id
502    // Except for basefiles which has a primary key on baseid.
503    // Therefore, we need to special case that one table.
504
505    // everything else can use the PostgreSQL formula.
506
507    char      sequence[NAMEDATALEN-1];
508    char      query   [NAMEDATALEN+50];
509    PGresult *result;
510    int       id = 0;
511
512    if (strcasecmp(table_name, "basefiles") == 0) {
513       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
514    } else {
515       bstrncpy(sequence, table_name, sizeof(sequence));
516       bstrncat(sequence, "_",        sizeof(sequence));
517       bstrncat(sequence, table_name, sizeof(sequence));
518       bstrncat(sequence, "id",       sizeof(sequence));
519    }
520
521    bstrncat(sequence, "_seq", sizeof(sequence));
522    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
523
524 // Mmsg(query, "SELECT currval('%s')", sequence);
525    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
526    result = PQexec(mdb->db, query);
527
528    Dmsg0(500, "exec done");
529
530    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
531       Dmsg0(500, "getting value");
532       id = atoi(PQgetvalue(result, 0, 0));
533       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
534    } else {
535       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
536    }
537
538    PQclear(result);
539
540    return id;
541 }
542
543 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
544 {
545    Dmsg0(500, "my_postgresql_batch_start started\n");
546
547    if (my_postgresql_query(mdb,
548                            " CREATE TEMPORARY TABLE batch "
549                            "        (fileindex int,       "
550                            "        jobid int,            "
551                            "        path varchar,         "
552                            "        name varchar,         "
553                            "        lstat varchar,        "
554                            "        md5 varchar)") == 1)
555    {
556       Dmsg0(500, "my_postgresql_batch_start failed\n");
557       return 1;
558    }
559    
560    // We are starting a new query.  reset everything.
561    mdb->num_rows     = -1;
562    mdb->row_number   = -1;
563    mdb->field_number = -1;
564
565    if (mdb->result != NULL) {
566       my_postgresql_free_result(mdb);
567    }
568
569    mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN");
570    mdb->status = PQresultStatus(mdb->result);
571    if (mdb->status == PGRES_COPY_IN) {
572       // how many fields in the set?
573       mdb->num_fields = (int) PQnfields(mdb->result);
574       mdb->num_rows   = 0;
575       mdb->status = 1;
576    } else {
577       Dmsg0(500, "we failed\n");
578       mdb->status = 0;
579    }
580
581    Dmsg0(500, "my_postgresql_batch_start finishing\n");
582
583    return mdb->status;
584 }
585
586 /* set error to something to abort operation */
587 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
588 {
589    int res;
590    int count=30;
591    Dmsg0(500, "my_postgresql_batch_end started\n");
592
593    if (!mdb) {                  /* no files ? */
594       return 0;
595    }
596
597    do { 
598       res = PQputCopyEnd(mdb->db, error);
599    } while (res == 0 && --count > 0);
600
601    if (res == 1) {
602       Dmsg0(500, "ok\n");
603       mdb->status = 1;
604    }
605    
606    if (res <= 0) {
607       Dmsg0(500, "we failed\n");
608       mdb->status = 0;
609       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
610    }
611    
612    Dmsg0(500, "my_postgresql_batch_end finishing\n");
613
614    return mdb->status;
615 }
616
617 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
618 {
619    int res;
620    int count=30;
621    size_t len;
622    char *digest;
623    char ed1[50];
624
625    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
626    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
627
628    mdb->esc_name2 = check_pool_memory_size(mdb->esc_name2, mdb->pnl*2+1);
629    my_postgresql_copy_escape(mdb->esc_name2, mdb->path, mdb->pnl);
630
631    if (ar->Digest == NULL || ar->Digest[0] == 0) {
632       digest = "0";
633    } else {
634       digest = ar->Digest;
635    }
636
637    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
638               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_name2, 
639               mdb->esc_name, ar->attr, digest);
640
641    do { 
642       res = PQputCopyData(mdb->db,
643                           mdb->cmd,
644                           len);
645    } while (res == 0 && --count > 0);
646
647    if (res == 1) {
648       Dmsg0(500, "ok\n");
649       mdb->changes++;
650       mdb->status = 1;
651    }
652
653    if (res <= 0) {
654       Dmsg0(500, "we failed\n");
655       mdb->status = 0;
656       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
657    }
658
659    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
660
661    return mdb->status;
662 }
663
664 /*
665  * Escape strings so that PostgreSQL is happy on COPY
666  *
667  *   NOTE! len is the length of the old string. Your new
668  *         string must be long enough (max 2*old+1) to hold
669  *         the escaped output.
670  */
671 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
672 {
673    /* we have to escape \t, \n, \r, \ */
674    char c = '\0' ;
675
676    while (len > 0 && *src) {
677       switch (*src) {
678       case '\n':
679          c = 'n';
680          break;
681       case '\\':
682          c = '\\';
683          break;
684       case '\t':
685          c = 't';
686          break;
687       case '\r':
688          c = 'r';
689          break;
690       default:
691          c = '\0' ;
692       }
693
694       if (c) {
695          *dest = '\\';
696          dest++;
697          *dest = c;
698       } else {
699          *dest = *src;
700       }
701
702       len--;
703       src++;
704       dest++;
705    }
706
707    *dest = '\0';
708    return dest;
709 }
710
711 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
712
713
714 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
715
716 char *my_pg_batch_unlock_tables_query = "COMMIT";
717
718 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
719                                     "  SELECT a.Path FROM                                       "
720                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
721                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
722
723
724 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
725                                         "  SELECT a.Name FROM               "
726                                         "    (SELECT DISTINCT Name FROM batch) as a "
727                                         "    WHERE NOT EXISTS               "
728                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
729 #endif /* HAVE_POSTGRESQL */