]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
kes Fix restore before command.
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation plus additions
11    that are listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to PostgreSQL
30  *   These are PostgreSQL specific routines
31  *
32  *    Dan Langille, December 2003
33  *    based upon work done by Kern Sibbald, March 2000
34  *
35  *    Version $Id$
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_POSTGRESQL
48
49 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
50
51 /* -----------------------------------------------------------------------
52  *
53  *   PostgreSQL dependent defines and subroutines
54  *
55  * -----------------------------------------------------------------------
56  */
57
58 /* List of open databases */
59 static BQUEUE db_list = {&db_list, &db_list};
60
61 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
62
63 /*
64  * Retrieve database type
65  */
66 const char *
67 db_get_type(void)
68 {
69    return "PostgreSQL";
70
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /*
135  * Now actually open the database.  This can generate errors,
136  *   which are returned in the errmsg
137  *
138  * DO NOT close the database or free(mdb) here !!!!
139  */
140 int
141 db_open_database(JCR *jcr, B_DB *mdb)
142 {
143    int errstat;
144    char buf[10], *port;
145
146    P(mutex);
147    if (mdb->connected) {
148       V(mutex);
149       return 1;
150    }
151    mdb->connected = false;
152
153    if ((errstat=rwl_init(&mdb->lock)) != 0) {
154       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
155             strerror(errstat));
156       V(mutex);
157       return 0;
158    }
159
160    if (mdb->db_port) {
161       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
162       port = buf;
163    } else {
164       port = NULL;
165    }
166
167    /* If connection fails, try at 5 sec intervals for 30 seconds. */
168    for (int retry=0; retry < 6; retry++) {
169       /* connect to the database */
170       mdb->db = PQsetdbLogin(
171            mdb->db_address,           /* default = localhost */
172            port,                      /* default port */
173            NULL,                      /* pg options */
174            NULL,                      /* tty, ignored */
175            mdb->db_name,              /* database name */
176            mdb->db_user,              /* login name */
177            mdb->db_password);         /* password */
178
179       /* If no connect, try once more in case it is a timing problem */
180       if (PQstatus(mdb->db) == CONNECTION_OK) {
181          break;
182       }
183       bmicrosleep(5, 0);
184    }
185
186    Dmsg0(50, "pg_real_connect done\n");
187    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
188             mdb->db_password==NULL?"(NULL)":mdb->db_password);
189
190    if (PQstatus(mdb->db) != CONNECTION_OK) {
191       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
192             "Database=%s User=%s\n"
193             "It is probably not running or your password is incorrect.\n"),
194              mdb->db_name, mdb->db_user);
195       V(mutex);
196       return 0;
197    }
198
199    mdb->connected = true;
200
201    if (!check_tables_version(jcr, mdb)) {
202       V(mutex);
203       return 0;
204    }
205
206    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
207
208    V(mutex);
209    return 1;
210 }
211
212 void
213 db_close_database(JCR *jcr, B_DB *mdb)
214 {
215    if (!mdb) {
216       return;
217    }
218    db_end_transaction(jcr, mdb);
219    P(mutex);
220    sql_free_result(mdb);
221    mdb->ref_count--;
222    if (mdb->ref_count == 0) {
223       qdchain(&mdb->bq);
224       if (mdb->connected && mdb->db) {
225          sql_close(mdb);
226       }
227       rwl_destroy(&mdb->lock);
228       free_pool_memory(mdb->errmsg);
229       free_pool_memory(mdb->cmd);
230       free_pool_memory(mdb->cached_path);
231       free_pool_memory(mdb->fname);
232       free_pool_memory(mdb->path);
233       free_pool_memory(mdb->esc_name);
234       free_pool_memory(mdb->esc_path);
235       if (mdb->db_name) {
236          free(mdb->db_name);
237       }
238       if (mdb->db_user) {
239          free(mdb->db_user);
240       }
241       if (mdb->db_password) {
242          free(mdb->db_password);
243       }
244       if (mdb->db_address) {
245          free(mdb->db_address);
246       }
247       if (mdb->db_socket) {
248          free(mdb->db_socket);
249       }
250       my_postgresql_free_result(mdb);
251       free(mdb);
252    }
253    V(mutex);
254 }
255
256 /*
257  * Return the next unique index (auto-increment) for
258  * the given table.  Return NULL on error.
259  *
260  * For PostgreSQL, NULL causes the auto-increment value
261  *  to be updated.
262  */
263 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
264 {
265    strcpy(index, "NULL");
266    return 1;
267 }
268
269
270 /*
271  * Escape strings so that PostgreSQL is happy
272  *
273  *   NOTE! len is the length of the old string. Your new
274  *         string must be long enough (max 2*old+1) to hold
275  *         the escaped output.
276  */
277 void
278 db_escape_string(char *snew, char *old, int len)
279 {
280    PQescapeString(snew, old, len);
281 }
282
283 /*
284  * Submit a general SQL command (cmd), and for each row returned,
285  *  the sqlite_handler is called with the ctx.
286  */
287 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
288 {
289    SQL_ROW row;
290
291    Dmsg0(500, "db_sql_query started\n");
292
293    db_lock(mdb);
294    if (sql_query(mdb, query) != 0) {
295       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
296       db_unlock(mdb);
297       Dmsg0(500, "db_sql_query failed\n");
298       return 0;
299    }
300    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
301
302    if (result_handler != NULL) {
303       Dmsg0(500, "db_sql_query invoking handler\n");
304       if ((mdb->result = sql_store_result(mdb)) != NULL) {
305          int num_fields = sql_num_fields(mdb);
306
307          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
308          while ((row = sql_fetch_row(mdb)) != NULL) {
309
310             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
311             if (result_handler(ctx, num_fields, row))
312                break;
313          }
314
315         sql_free_result(mdb);
316       }
317    }
318    db_unlock(mdb);
319
320    Dmsg0(500, "db_sql_query finished\n");
321
322    return 1;
323 }
324
325
326
327 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
328 {
329    int j;
330    POSTGRESQL_ROW row = NULL; // by default, return NULL
331
332    Dmsg0(500, "my_postgresql_fetch_row start\n");
333
334    if (mdb->row_number == -1 || mdb->row == NULL) {
335       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
336
337       if (mdb->row != NULL) {
338          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
339          free(mdb->row);
340          mdb->row = NULL;
341       }
342
343       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
344
345       // now reset the row_number now that we have the space allocated
346       mdb->row_number = 0;
347    }
348
349    // if still within the result set
350    if (mdb->row_number < mdb->num_rows) {
351       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
352       // get each value from this row
353       for (j = 0; j < mdb->num_fields; j++) {
354          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
355          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
356       }
357       // increment the row number for the next call
358       mdb->row_number++;
359
360       row = mdb->row;
361    } else {
362       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
363    }
364
365    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
366
367    return row;
368 }
369
370 int my_postgresql_max_length(B_DB *mdb, int field_num) {
371    //
372    // for a given column, find the max length
373    //
374    int max_length;
375    int i;
376    int this_length;
377
378    max_length = 0;
379    for (i = 0; i < mdb->num_rows; i++) {
380       if (PQgetisnull(mdb->result, i, field_num)) {
381           this_length = 4;        // "NULL"
382       } else {
383           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
384       }
385
386       if (max_length < this_length) {
387           max_length = this_length;
388       }
389    }
390
391    return max_length;
392 }
393
394 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
395 {
396    int     i;
397
398    Dmsg0(500, "my_postgresql_fetch_field starts\n");
399    if (mdb->fields == NULL) {
400       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
401       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
402
403       for (i = 0; i < mdb->num_fields; i++) {
404          Dmsg1(500, "filling field %d\n", i);
405          mdb->fields[i].name           = PQfname(mdb->result, i);
406          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
407          mdb->fields[i].type       = PQftype(mdb->result, i);
408          mdb->fields[i].flags      = 0;
409
410          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
411             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
412             mdb->fields[i].flags);
413       } // end for
414    } // end if
415
416    // increment field number for the next time around
417
418    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
419    return &mdb->fields[mdb->field_number++];
420 }
421
422 void my_postgresql_data_seek(B_DB *mdb, int row)
423 {
424    // set the row number to be returned on the next call
425    // to my_postgresql_fetch_row
426    mdb->row_number = row;
427 }
428
429 void my_postgresql_field_seek(B_DB *mdb, int field)
430 {
431    mdb->field_number = field;
432 }
433
434 /*
435  * Note, if this routine returns 1 (failure), Bacula expects
436  *  that no result has been stored.
437  */
438 int my_postgresql_query(B_DB *mdb, const char *query) {
439    Dmsg0(500, "my_postgresql_query started\n");
440    // We are starting a new query.  reset everything.
441    mdb->num_rows     = -1;
442    mdb->row_number   = -1;
443    mdb->field_number = -1;
444
445    if (mdb->result != NULL) {
446       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
447    }
448
449    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
450    mdb->result = PQexec(mdb->db, query);
451    mdb->status = PQresultStatus(mdb->result);
452    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
453       Dmsg1(500, "we have a result\n", query);
454
455       // how many fields in the set?
456       mdb->num_fields = (int) PQnfields(mdb->result);
457       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
458
459       mdb->num_rows   = PQntuples(mdb->result);
460       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
461
462       mdb->status = 0;
463    } else {
464       Dmsg1(500, "we failed\n", query);
465       mdb->status = 1;
466    }
467
468    Dmsg0(500, "my_postgresql_query finishing\n");
469
470    return mdb->status;
471 }
472
473 void my_postgresql_free_result(B_DB *mdb)
474 {
475    if (mdb->result) {
476       PQclear(mdb->result);
477       mdb->result = NULL;
478    }
479
480    if (mdb->row) {
481       free(mdb->row);
482       mdb->row = NULL;
483    }
484
485    if (mdb->fields) {
486       free(mdb->fields);
487       mdb->fields = NULL;
488    }
489 }
490
491 int my_postgresql_currval(B_DB *mdb, char *table_name)
492 {
493    // Obtain the current value of the sequence that
494    // provides the serial value for primary key of the table.
495
496    // currval is local to our session.  It is not affected by
497    // other transactions.
498
499    // Determine the name of the sequence.
500    // PostgreSQL automatically creates a sequence using
501    // <table>_<column>_seq.
502    // At the time of writing, all tables used this format for
503    // for their primary key: <table>id
504    // Except for basefiles which has a primary key on baseid.
505    // Therefore, we need to special case that one table.
506
507    // everything else can use the PostgreSQL formula.
508
509    char      sequence[NAMEDATALEN-1];
510    char      query   [NAMEDATALEN+50];
511    PGresult *result;
512    int       id = 0;
513
514    if (strcasecmp(table_name, "basefiles") == 0) {
515       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
516    } else {
517       bstrncpy(sequence, table_name, sizeof(sequence));
518       bstrncat(sequence, "_",        sizeof(sequence));
519       bstrncat(sequence, table_name, sizeof(sequence));
520       bstrncat(sequence, "id",       sizeof(sequence));
521    }
522
523    bstrncat(sequence, "_seq", sizeof(sequence));
524    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
525
526 // Mmsg(query, "SELECT currval('%s')", sequence);
527    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
528    result = PQexec(mdb->db, query);
529
530    Dmsg0(500, "exec done");
531
532    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
533       Dmsg0(500, "getting value");
534       id = atoi(PQgetvalue(result, 0, 0));
535       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
536    } else {
537       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
538    }
539
540    PQclear(result);
541
542    return id;
543 }
544
545 int my_postgresql_batch_start(JCR *jcr, B_DB *mdb)
546 {
547    Dmsg0(500, "my_postgresql_batch_start started\n");
548
549    if (my_postgresql_query(mdb,
550                            " CREATE TEMPORARY TABLE batch "
551                            "        (fileindex int,       "
552                            "        jobid int,            "
553                            "        path varchar,         "
554                            "        name varchar,         "
555                            "        lstat varchar,        "
556                            "        md5 varchar)") == 1)
557    {
558       Dmsg0(500, "my_postgresql_batch_start failed\n");
559       return 1;
560    }
561    
562    // We are starting a new query.  reset everything.
563    mdb->num_rows     = -1;
564    mdb->row_number   = -1;
565    mdb->field_number = -1;
566
567    if (mdb->result != NULL) {
568       my_postgresql_free_result(mdb);
569    }
570
571    mdb->result = PQexec(mdb->db, "COPY batch FROM STDIN");
572    mdb->status = PQresultStatus(mdb->result);
573    if (mdb->status == PGRES_COPY_IN) {
574       // how many fields in the set?
575       mdb->num_fields = (int) PQnfields(mdb->result);
576       mdb->num_rows   = 0;
577       mdb->status = 1;
578    } else {
579       Dmsg0(500, "we failed\n");
580       mdb->status = 0;
581    }
582
583    Dmsg0(500, "my_postgresql_batch_start finishing\n");
584
585    return mdb->status;
586 }
587
588 /* set error to something to abort operation */
589 int my_postgresql_batch_end(JCR *jcr, B_DB *mdb, const char *error)
590 {
591    int res;
592    int count=30;
593    Dmsg0(500, "my_postgresql_batch_end started\n");
594
595    if (!mdb) {                  /* no files ? */
596       return 0;
597    }
598
599    do { 
600       res = PQputCopyEnd(mdb->db, error);
601    } while (res == 0 && --count > 0);
602
603    if (res == 1) {
604       Dmsg0(500, "ok\n");
605       mdb->status = 1;
606    }
607    
608    if (res <= 0) {
609       Dmsg0(500, "we failed\n");
610       mdb->status = 0;
611       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
612    }
613    
614    Dmsg0(500, "my_postgresql_batch_end finishing\n");
615
616    return mdb->status;
617 }
618
619 int my_postgresql_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
620 {
621    int res;
622    int count=30;
623    size_t len;
624    char *digest;
625    char ed1[50];
626
627    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
628    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
629
630    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
631    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
632
633    if (ar->Digest == NULL || ar->Digest[0] == 0) {
634       digest = "0";
635    } else {
636       digest = ar->Digest;
637    }
638
639    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
640               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
641               mdb->esc_name, ar->attr, digest);
642
643    do { 
644       res = PQputCopyData(mdb->db,
645                           mdb->cmd,
646                           len);
647    } while (res == 0 && --count > 0);
648
649    if (res == 1) {
650       Dmsg0(500, "ok\n");
651       mdb->changes++;
652       mdb->status = 1;
653    }
654
655    if (res <= 0) {
656       Dmsg0(500, "we failed\n");
657       mdb->status = 0;
658       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s\n"), PQerrorMessage(mdb->db));
659    }
660
661    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
662
663    return mdb->status;
664 }
665
666 /*
667  * Escape strings so that PostgreSQL is happy on COPY
668  *
669  *   NOTE! len is the length of the old string. Your new
670  *         string must be long enough (max 2*old+1) to hold
671  *         the escaped output.
672  */
673 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
674 {
675    /* we have to escape \t, \n, \r, \ */
676    char c = '\0' ;
677
678    while (len > 0 && *src) {
679       switch (*src) {
680       case '\n':
681          c = 'n';
682          break;
683       case '\\':
684          c = '\\';
685          break;
686       case '\t':
687          c = 't';
688          break;
689       case '\r':
690          c = 'r';
691          break;
692       default:
693          c = '\0' ;
694       }
695
696       if (c) {
697          *dest = '\\';
698          dest++;
699          *dest = c;
700       } else {
701          *dest = *src;
702       }
703
704       len--;
705       src++;
706       dest++;
707    }
708
709    *dest = '\0';
710    return dest;
711 }
712
713 char *my_pg_batch_lock_path_query = "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
714
715
716 char *my_pg_batch_lock_filename_query = "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
717
718 char *my_pg_batch_unlock_tables_query = "COMMIT";
719
720 char *my_pg_batch_fill_path_query = "INSERT INTO Path (Path)                                    "
721                                     "  SELECT a.Path FROM                                       "
722                                     "      (SELECT DISTINCT Path FROM batch) AS a               "
723                                     "  WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
724
725
726 char *my_pg_batch_fill_filename_query = "INSERT INTO Filename (Name)        "
727                                         "  SELECT a.Name FROM               "
728                                         "    (SELECT DISTINCT Name FROM batch) as a "
729                                         "    WHERE NOT EXISTS               "
730                                         "      (SELECT Name FROM Filename WHERE Name = a.Name)";
731 #endif /* HAVE_POSTGRESQL */