]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Fix stupid inverted logic
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  *    Version $Id$
37  */
38 /*
39  * This code only compiles against a recent version of libdbi. The current
40  * release found on the libdbi website (0.8.3) won't work for this code.
41  *
42  * You find the libdbi library on http://sourceforge.net/projects/libdbi
43  *
44  * A fairly recent version of libdbi from CVS works, so either make sure
45  * your distribution has a fairly recent version of libdbi installed or
46  * clone the CVS repositories from sourceforge and compile that code and
47  * install it.
48  *
49  * You need:
50  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
51  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
52  */
53
54
55 /* The following is necessary so that we do not include
56  * the dummy external definition of DB.
57  */
58 #define __SQL_C                       /* indicate that this is sql.c */
59
60 #include "bacula.h"
61 #include "cats.h"
62
63 #ifdef HAVE_DBI
64
65 /* -----------------------------------------------------------------------
66  *
67  *   DBI dependent defines and subroutines
68  *
69  * -----------------------------------------------------------------------
70  */
71
72 /* List of open databases */
73 static BQUEUE db_list = {&db_list, &db_list};
74
75 /* Control allocated fields by my_dbi_getvalue */
76 static BQUEUE dbi_getvalue_list = {&dbi_getvalue_list, &dbi_getvalue_list};
77
78 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
79
80 /*
81  * Retrieve database type
82  */
83 const char *
84 db_get_type(void)
85 {
86    return "DBI";
87 }
88
89 /*
90  * Initialize database data structure. In principal this should
91  * never have errors, or it is really fatal.
92  */
93 B_DB *
94 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
95                  const char *db_address, int db_port, const char *db_socket,
96                  int mult_db_connections)
97 {
98    B_DB *mdb;
99    char db_driver[10];
100    char db_driverdir[256];
101
102    /* Constraint the db_driver */
103    if(db_type  == -1) {
104       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
105       return NULL;
106    }
107
108    /* Do the correct selection of driver.
109     * Can be one of the varius supported by libdbi
110     */
111    switch (db_type) {
112    case SQL_TYPE_MYSQL:
113       bstrncpy(db_driver,"mysql", sizeof(db_driver));
114       break;
115    case SQL_TYPE_POSTGRESQL:
116       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
117       break;
118    case SQL_TYPE_SQLITE:
119       bstrncpy(db_driver,"sqlite", sizeof(db_driver));
120       break;
121    case SQL_TYPE_SQLITE3:
122       bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
123       break;
124    }
125
126    /* Set db_driverdir whereis is the libdbi drivers */
127    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
128
129    if (!db_user) {
130       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
131       return NULL;
132    }
133    P(mutex);                          /* lock DB queue */
134    if (!mult_db_connections) {
135       /* Look to see if DB already open */
136       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
137          if (bstrcmp(mdb->db_name, db_name) &&
138              bstrcmp(mdb->db_address, db_address) &&
139              bstrcmp(mdb->db_driver, db_driver) &&
140              mdb->db_port == db_port) {
141             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
142                   dbi_conn_error(mdb->db, NULL));
143             mdb->ref_count++;
144             V(mutex);
145             return mdb;                  /* already open */
146          }
147       }
148    }
149    Dmsg0(100, "db_open first time\n");
150    mdb = (B_DB *)malloc(sizeof(B_DB));
151    memset(mdb, 0, sizeof(B_DB));
152    mdb->db_name = bstrdup(db_name);
153    mdb->db_user = bstrdup(db_user);
154    if (db_password) {
155       mdb->db_password = bstrdup(db_password);
156    }
157    if (db_address) {
158       mdb->db_address  = bstrdup(db_address);
159    }
160    if (db_socket) {
161       mdb->db_socket   = bstrdup(db_socket);
162    }
163    if (db_driverdir) {
164       mdb->db_driverdir = bstrdup(db_driverdir);
165    }
166    if (db_driver) {
167       mdb->db_driver    = bstrdup(db_driver);
168    }
169    mdb->db_type        = db_type;
170    mdb->db_port        = db_port;
171    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
172    *mdb->errmsg        = 0;
173    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
174    mdb->cached_path    = get_pool_memory(PM_FNAME);
175    mdb->cached_path_id = 0;
176    mdb->ref_count      = 1;
177    mdb->fname          = get_pool_memory(PM_FNAME);
178    mdb->path           = get_pool_memory(PM_FNAME);
179    mdb->esc_name       = get_pool_memory(PM_FNAME);
180    mdb->esc_path      = get_pool_memory(PM_FNAME);
181    mdb->allow_transactions = mult_db_connections;
182    qinsert(&db_list, &mdb->bq);            /* put db in list */
183    V(mutex);
184    return mdb;
185 }
186
187 /*
188  * Now actually open the database.  This can generate errors,
189  *   which are returned in the errmsg
190  *
191  * DO NOT close the database or free(mdb) here  !!!!
192  */
193 int
194 db_open_database(JCR *jcr, B_DB *mdb)
195 {
196    int errstat;
197    int dbstat;
198    uint8_t len;
199    const char *errmsg;
200    char buf[10], *port;
201    int numdrivers;
202    char *db_name = NULL;
203    char *db_dir = NULL;
204
205    P(mutex);
206    if (mdb->connected) {
207       V(mutex);
208       return 1;
209    }
210    mdb->connected = false;
211
212    if ((errstat=rwl_init(&mdb->lock)) != 0) {
213       berrno be;
214       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
215             be.bstrerror(errstat));
216       V(mutex);
217       return 0;
218    }
219
220    if (mdb->db_port) {
221       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
222       port = buf;
223    } else {
224       port = NULL;
225    }
226
227    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
228    if (numdrivers < 0) {
229       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
230                                "db_driverdir=%s. It is probaly not found any drivers\n"),
231                                mdb->db_driverdir,numdrivers);
232       V(mutex);
233       return 0;
234    }
235    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
236    /* Can be many types of databases */
237    switch (mdb->db_type) {
238    case SQL_TYPE_MYSQL:
239       dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
240       dbi_conn_set_option(mdb->db, "port", port);            /* default port */
241       dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
242       dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
243       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
244       break;
245    case SQL_TYPE_POSTGRESQL:
246       dbi_conn_set_option(mdb->db, "host", mdb->db_address);
247       dbi_conn_set_option(mdb->db, "port", port);
248       dbi_conn_set_option(mdb->db, "username", mdb->db_user);
249       dbi_conn_set_option(mdb->db, "password", mdb->db_password);
250       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
251       break;
252    case SQL_TYPE_SQLITE:
253       len = strlen(working_directory) + 5;
254       db_dir = (char *)malloc(len);
255       strcpy(db_dir, working_directory);
256       strcat(db_dir, "/");
257       len = strlen(mdb->db_name) + 5;
258       db_name = (char *)malloc(len);
259       strcpy(db_name, mdb->db_name);
260       strcat(db_name, ".db");
261       dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
262       dbi_conn_set_option(mdb->db, "dbname", db_name);
263       break;
264    case SQL_TYPE_SQLITE3:
265       len = strlen(working_directory) + 5;
266       db_dir = (char *)malloc(len);
267       strcpy(db_dir, working_directory);
268       strcat(db_dir, "/");
269       len = strlen(mdb->db_name) + 5;
270       db_name = (char *)malloc(len);
271       strcpy(db_name, mdb->db_name);
272       strcat(db_name, ".db");
273       dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
274       dbi_conn_set_option(mdb->db, "dbname", db_name);
275       Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
276       break;
277    }
278
279    /* If connection fails, try at 5 sec intervals for 30 seconds. */
280    for (int retry=0; retry < 6; retry++) {
281
282       dbstat = dbi_conn_connect(mdb->db);
283       if ( dbstat == 0) {
284          break;
285       }
286
287       dbi_conn_error(mdb->db, &errmsg);
288       Dmsg1(50, "dbi error: %s\n", errmsg);
289
290       bmicrosleep(5, 0);
291
292    }
293
294    if ( dbstat != 0 ) {
295       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
296          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
297          mdb->db_driver, mdb->db_name, mdb->db_user);
298       V(mutex);
299       return 0;
300    }
301
302    Dmsg0(50, "dbi_real_connect done\n");
303    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
304                     mdb->db_user, mdb->db_name,
305                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
306
307    mdb->connected = true;
308
309    if (!check_tables_version(jcr, mdb)) {
310       V(mutex);
311       return 0;
312    }
313
314    switch (mdb->db_type) {
315    case SQL_TYPE_MYSQL:
316       /* Set connection timeout to 8 days specialy for batch mode */
317       sql_query(mdb, "SET wait_timeout=691200");
318       sql_query(mdb, "SET interactive_timeout=691200");
319       break;
320    case SQL_TYPE_POSTGRESQL:
321       /* tell PostgreSQL we are using standard conforming strings
322          and avoid warnings such as:
323          WARNING:  nonstandard use of \\ in a string literal
324       */
325       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
326       sql_query(mdb, "set standard_conforming_strings=on");
327       break;
328    }
329
330    if(db_dir) {
331       free(db_dir);
332    }
333    if(db_name) {
334       free(db_name);
335    }
336
337    V(mutex);
338    return 1;
339 }
340
341 void
342 db_close_database(JCR *jcr, B_DB *mdb)
343 {
344    if (!mdb) {
345       return;
346    }
347    db_end_transaction(jcr, mdb);
348    P(mutex);
349    sql_free_result(mdb);
350    mdb->ref_count--;
351    if (mdb->ref_count == 0) {
352       qdchain(&mdb->bq);
353       if (mdb->connected && mdb->db) {
354          //sql_close(mdb);
355          dbi_shutdown_r(mdb->instance);
356          mdb->db = NULL;
357          mdb->instance = NULL;
358       }
359       rwl_destroy(&mdb->lock);
360       free_pool_memory(mdb->errmsg);
361       free_pool_memory(mdb->cmd);
362       free_pool_memory(mdb->cached_path);
363       free_pool_memory(mdb->fname);
364       free_pool_memory(mdb->path);
365       free_pool_memory(mdb->esc_name);
366       free_pool_memory(mdb->esc_path);
367       if (mdb->db_name) {
368          free(mdb->db_name);
369       }
370       if (mdb->db_user) {
371          free(mdb->db_user);
372       }
373       if (mdb->db_password) {
374          free(mdb->db_password);
375       }
376       if (mdb->db_address) {
377          free(mdb->db_address);
378       }
379       if (mdb->db_socket) {
380          free(mdb->db_socket);
381       }
382       if (mdb->db_driverdir) {
383          free(mdb->db_driverdir);
384       }
385       if (mdb->db_driver) {
386           free(mdb->db_driver);
387       }
388       free(mdb);
389    }
390    V(mutex);
391 }
392
393 void db_check_backend_thread_safe()
394 { }
395
396 void db_thread_cleanup()
397 { }
398
399 /*
400  * Return the next unique index (auto-increment) for
401  * the given table.  Return NULL on error.
402  *
403  */
404 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
405 {
406    strcpy(index, "NULL");
407    return 1;
408 }
409
410
411 /*
412  * Escape strings so that DBI is happy
413  *
414  *   NOTE! len is the length of the old string. Your new
415  *         string must be long enough (max 2*old+1) to hold
416  *         the escaped output.
417  *
418  * dbi_conn_quote_string_copy receives a pointer to pointer.
419  * We need copy the value of pointer to snew because libdbi change the
420  * pointer
421  */
422 void
423 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
424 {
425    char *inew;
426    char *pnew;
427
428    if (len == 0) {
429       snew[0] = 0;
430    } else {
431       /* correct the size of old basead in len
432        * and copy new string to inew
433        */
434       inew = (char *)malloc(sizeof(char) * len + 1);
435       bstrncpy(inew,old,len + 1);
436       /* escape the correct size of old */
437       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
438       free(inew);
439       /* copy the escaped string to snew */
440       bstrncpy(snew, pnew, 2 * len + 1);
441    }
442
443    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
444
445 }
446
447 /*
448  * Submit a general SQL command (cmd), and for each row returned,
449  *  the sqlite_handler is called with the ctx.
450  */
451 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
452 {
453    SQL_ROW row;
454
455    Dmsg0(500, "db_sql_query started\n");
456
457    db_lock(mdb);
458    if (sql_query(mdb, query) != 0) {
459       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
460       db_unlock(mdb);
461       Dmsg0(500, "db_sql_query failed\n");
462       return false;
463    }
464    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
465
466    if (result_handler != NULL) {
467       Dmsg0(500, "db_sql_query invoking handler\n");
468       if ((mdb->result = sql_store_result(mdb)) != NULL) {
469          int num_fields = sql_num_fields(mdb);
470
471          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
472          while ((row = sql_fetch_row(mdb)) != NULL) {
473
474             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
475             if (result_handler(ctx, num_fields, row))
476                break;
477          }
478
479         sql_free_result(mdb);
480       }
481    }
482    db_unlock(mdb);
483
484    Dmsg0(500, "db_sql_query finished\n");
485
486    return true;
487 }
488
489
490
491 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
492 {
493    int j;
494    DBI_ROW row = NULL; // by default, return NULL
495
496    Dmsg0(500, "my_dbi_fetch_row start\n");
497    if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
498       int num_fields = mdb->num_fields;
499       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
500
501       if (mdb->row) {
502          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
503          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
504          if (mdb->num_rows != 0) {
505             for(j = 0; j < mdb->num_fields; j++) {
506                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
507                   if(mdb->row[j]) {
508                      free(mdb->row[j]);
509                   }
510             }
511          }
512          free(mdb->row);
513       }
514       //num_fields += 20;                  /* add a bit extra */
515       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
516       mdb->row_size = num_fields;
517
518       // now reset the row_number now that we have the space allocated
519       mdb->row_number = 1;
520    }
521
522    // if still within the result set
523    if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
524       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
525       // get each value from this row
526       for (j = 0; j < mdb->num_fields; j++) {
527          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
528          // allocate space to queue row
529          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
530          // store the pointer in queue
531          mdb->field_get->value = mdb->row[j];
532          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
533                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
534          // insert in queue to future free
535          qinsert(&dbi_getvalue_list, &mdb->field_get->bq);
536       }
537       // increment the row number for the next call
538       mdb->row_number++;
539
540       row = mdb->row;
541    } else {
542       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
543    }
544
545    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
546
547    return row;
548 }
549
550 int my_dbi_max_length(B_DB *mdb, int field_num) {
551    //
552    // for a given column, find the max length
553    //
554    int max_length;
555    int i;
556    int this_length;
557    char *cbuf = NULL;
558
559    max_length = 0;
560    for (i = 0; i < mdb->num_rows; i++) {
561       if (my_dbi_getisnull(mdb->result, i, field_num)) {
562           this_length = 4;        // "NULL"
563       } else {
564          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
565          this_length = cstrlen(cbuf);
566          // cbuf is always free
567          free(cbuf);
568       }
569
570       if (max_length < this_length) {
571           max_length = this_length;
572       }
573    }
574
575    return max_length;
576 }
577
578 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
579 {
580    int     i;
581    int     dbi_index;
582
583    Dmsg0(500, "my_dbi_fetch_field starts\n");
584
585    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
586       if (mdb->fields) {
587          free(mdb->fields);
588       }
589       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
590       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
591       mdb->fields_size = mdb->num_fields;
592
593       for (i = 0; i < mdb->num_fields; i++) {
594          // num_fileds is starting at 1, increment i by 1
595          dbi_index = i + 1;
596          Dmsg1(500, "filling field %d\n", i);
597          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
598          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
599          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
600          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
601
602          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
603             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
604             mdb->fields[i].flags);
605       } // end for
606    } // end if
607
608    // increment field number for the next time around
609
610    Dmsg0(500, "my_dbi_fetch_field finishes\n");
611    return &mdb->fields[mdb->field_number++];
612 }
613
614 void my_dbi_data_seek(B_DB *mdb, int row)
615 {
616    // set the row number to be returned on the next call
617    // to my_dbi_fetch_row
618    mdb->row_number = row;
619 }
620
621 void my_dbi_field_seek(B_DB *mdb, int field)
622 {
623    mdb->field_number = field;
624 }
625
626 /*
627  * Note, if this routine returns 1 (failure), Bacula expects
628  *  that no result has been stored.
629  *
630  *  Returns:  0  on success
631  *            1  on failure
632  *
633  */
634 int my_dbi_query(B_DB *mdb, const char *query)
635 {
636    const char *errmsg;
637    Dmsg1(500, "my_dbi_query started %s\n", query);
638    // We are starting a new query.  reset everything.
639    mdb->num_rows     = -1;
640    mdb->row_number   = -1;
641    mdb->field_number = -1;
642
643    if (mdb->result) {
644       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
645       mdb->result = NULL;
646    }
647
648    mdb->result = (void **)dbi_conn_query(mdb->db, query);
649
650    if (!mdb->result) {
651       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
652       goto bail_out;
653    }
654
655    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
656
657    if (mdb->status == DBI_ERROR_NONE) {
658       Dmsg1(500, "we have a result\n", query);
659
660       // how many fields in the set?
661       // num_fields starting at 1
662       mdb->num_fields = dbi_result_get_numfields(mdb->result);
663       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
664       // if no result num_rows is 0
665       mdb->num_rows = dbi_result_get_numrows(mdb->result);
666       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
667
668       mdb->status = (dbi_error_flag) 0;                  /* succeed */
669    } else {
670       Dmsg1(50, "Result status failed: %s\n", query);
671       goto bail_out;
672    }
673
674    Dmsg0(500, "my_dbi_query finishing\n");
675    return mdb->status;
676
677 bail_out:
678    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
679    //dbi_conn_error(mdb->db, &errmsg);
680    Dmsg4(500, "my_dbi_query we failed dbi error: "
681                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
682    dbi_result_free(mdb->result);
683    mdb->result = NULL;
684    mdb->status = (dbi_error_flag) 1;                   /* failed */
685    return mdb->status;
686 }
687
688 void my_dbi_free_result(B_DB *mdb)
689 {
690
691    DBI_FIELD_GET *f;
692    db_lock(mdb);
693    if (mdb->result) {
694       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
695       dbi_result_free(mdb->result);
696    }
697
698    mdb->result = NULL;
699
700    if (mdb->row) {
701       free(mdb->row);
702    }
703
704    /* now is time to free all value return by my_dbi_get_value
705     * this is necessary because libdbi don't free memory return by yours results
706     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
707     *
708     * Using a queue to store all pointer allocate is a good way to free all things
709     * when necessary
710     */
711    while((f=(DBI_FIELD_GET *)qremove(&dbi_getvalue_list))) {
712       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
713       free(f->value);
714       free(f);
715    }
716
717    mdb->row = NULL;
718
719    if (mdb->fields) {
720       free(mdb->fields);
721       mdb->fields = NULL;
722    }
723    db_unlock(mdb);
724    Dmsg0(500, "my_dbi_free_result finish\n");
725
726 }
727
728 const char *my_dbi_strerror(B_DB *mdb)
729 {
730    const char *errmsg;
731
732    dbi_conn_error(mdb->db, &errmsg);
733
734    return errmsg;
735 }
736
737 #ifdef HAVE_BATCH_FILE_INSERT
738
739 /*
740  * This can be a bit strang but is the one way to do
741  *
742  * Returns 1 if OK
743  *         0 if failed
744  */
745 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
746 {
747    char *query = "COPY batch FROM STDIN";
748
749    Dmsg0(500, "my_dbi_batch_start started\n");
750
751    switch (mdb->db_type) {
752    case SQL_TYPE_MYSQL:
753       db_lock(mdb);
754       if (my_dbi_query(mdb,
755                               "CREATE TEMPORARY TABLE batch ("
756                                   "FileIndex integer,"
757                                   "JobId integer,"
758                                   "Path blob,"
759                                   "Name blob,"
760                                   "LStat tinyblob,"
761                                   "MD5 tinyblob)") == 1)
762       {
763          Dmsg0(500, "my_dbi_batch_start failed\n");
764          return 1;
765       }
766       db_unlock(mdb);
767       Dmsg0(500, "my_dbi_batch_start finishing\n");
768       return 1;
769       break;
770    case SQL_TYPE_POSTGRESQL:
771
772       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
773                                   "fileindex int,"
774                                   "jobid int,"
775                                   "path varchar,"
776                                   "name varchar,"
777                                   "lstat varchar,"
778                                   "md5 varchar)") == 1)
779       {
780          Dmsg0(500, "my_dbi_batch_start failed\n");
781          return 1;
782       }
783
784       // We are starting a new query.  reset everything.
785       mdb->num_rows     = -1;
786       mdb->row_number   = -1;
787       mdb->field_number = -1;
788
789       my_dbi_free_result(mdb);
790
791       for (int i=0; i < 10; i++) {
792          my_dbi_query(mdb, query);
793          if (mdb->result) {
794             break;
795          }
796          bmicrosleep(5, 0);
797       }
798       if (!mdb->result) {
799          Dmsg1(50, "Query failed: %s\n", query);
800          goto bail_out;
801       }
802
803       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
804       //mdb->status = DBI_ERROR_NONE;
805
806       if (mdb->status == DBI_ERROR_NONE) {
807          // how many fields in the set?
808          mdb->num_fields = dbi_result_get_numfields(mdb->result);
809          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
810          mdb->status = (dbi_error_flag) 1;
811       } else {
812          Dmsg1(50, "Result status failed: %s\n", query);
813          goto bail_out;
814       }
815
816       Dmsg0(500, "my_postgresql_batch_start finishing\n");
817
818       return mdb->status;
819       break;
820    case SQL_TYPE_SQLITE:
821       db_lock(mdb);
822       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
823                                   "FileIndex integer,"
824                                   "JobId integer,"
825                                   "Path blob,"
826                                   "Name blob,"
827                                   "LStat tinyblob,"
828                                   "MD5 tinyblob)") == 1)
829       {
830          Dmsg0(500, "my_dbi_batch_start failed\n");
831          goto bail_out;
832       }
833       db_unlock(mdb);
834       Dmsg0(500, "my_dbi_batch_start finishing\n");
835       return 1;
836       break;
837    case SQL_TYPE_SQLITE3:
838       db_lock(mdb);
839       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
840                                   "FileIndex integer,"
841                                   "JobId integer,"
842                                   "Path blob,"
843                                   "Name blob,"
844                                   "LStat tinyblob,"
845                                   "MD5 tinyblob)") == 1)
846       {
847          Dmsg0(500, "my_dbi_batch_start failed\n");
848          goto bail_out;
849       }
850       db_unlock(mdb);
851       Dmsg0(500, "my_dbi_batch_start finishing\n");
852       return 1;
853       break;
854    }
855
856 bail_out:
857    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
858    mdb->status = (dbi_error_flag) 0;
859    my_dbi_free_result(mdb);
860    mdb->result = NULL;
861    return mdb->status;
862 }
863
864 /* set error to something to abort operation */
865 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
866 {
867    int res = 0;
868    int count = 30;
869    int (*custom_function)(void*, const char*) = NULL;
870    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
871
872    Dmsg0(500, "my_dbi_batch_end started\n");
873
874    if (!mdb) {                  /* no files ? */
875       return 0;
876    }
877
878    switch (mdb->db_type) {
879    case SQL_TYPE_MYSQL:
880       if(mdb) {
881          mdb->status = (dbi_error_flag) 0;
882       }
883       break;
884    case SQL_TYPE_POSTGRESQL:
885       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
886
887
888       do {
889          res = (*custom_function)(myconn->connection, error);
890       } while (res == 0 && --count > 0);
891
892       if (res == 1) {
893          Dmsg0(500, "ok\n");
894          mdb->status = (dbi_error_flag) 1;
895       }
896
897       if (res <= 0) {
898          Dmsg0(500, "we failed\n");
899          mdb->status = (dbi_error_flag) 0;
900          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
901        }
902       break;
903    case SQL_TYPE_SQLITE:
904       if(mdb) {
905          mdb->status = (dbi_error_flag) 0;
906       }
907       break;
908    case SQL_TYPE_SQLITE3:
909       if(mdb) {
910          mdb->status = (dbi_error_flag) 0;
911       }
912       break;
913    }
914
915    Dmsg0(500, "my_dbi_batch_end finishing\n");
916
917    return true;
918 }
919
920 /*
921  * This function is big and use a big switch.
922  * In near future is better split in small functions
923  * and refactory.
924  *
925  */
926 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
927 {
928    int res;
929    int count=30;
930    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
931    int (*custom_function)(void*, const char*, int) = NULL;
932    char* (*custom_function_error)(void*) = NULL;
933    size_t len;
934    char *digest;
935    char ed1[50];
936
937    Dmsg0(500, "my_dbi_batch_insert started \n");
938
939    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
940    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
941
942    if (ar->Digest == NULL || ar->Digest[0] == 0) {
943       digest = "0";
944    } else {
945       digest = ar->Digest;
946    }
947
948    switch (mdb->db_type) {
949    case SQL_TYPE_MYSQL:
950       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
951       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
952       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
953                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
954                       mdb->esc_name, ar->attr, digest);
955
956       if (my_dbi_query(mdb,mdb->cmd) == 1)
957       {
958          Dmsg0(500, "my_dbi_batch_insert failed\n");
959          goto bail_out;
960       }
961
962       Dmsg0(500, "my_dbi_batch_insert finishing\n");
963
964       return 1;
965       break;
966    case SQL_TYPE_POSTGRESQL:
967       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
968       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
969       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
970                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
971                      mdb->esc_name, ar->attr, digest);
972
973       /* libdbi don't support CopyData and we need call a postgresql
974        * specific function to do this work
975        */
976       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
977       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
978             "PQputCopyData")) != NULL) {
979          do {
980             res = (*custom_function)(myconn->connection, mdb->cmd, len);
981          } while (res == 0 && --count > 0);
982
983          if (res == 1) {
984             Dmsg0(500, "ok\n");
985             mdb->changes++;
986             mdb->status = (dbi_error_flag) 1;
987          }
988
989          if (res <= 0) {
990             Dmsg0(500, "my_dbi_batch_insert failed\n");
991             goto bail_out;
992          }
993
994          Dmsg0(500, "my_dbi_batch_insert finishing\n");
995          return mdb->status;
996       } else {
997          // ensure to detect a PQerror
998          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
999          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1000          goto bail_out;
1001       }
1002       break;
1003    case SQL_TYPE_SQLITE:
1004       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1005       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1006       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1007                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1008                       mdb->esc_name, ar->attr, digest);
1009       if (my_dbi_query(mdb,mdb->cmd) == 1)
1010       {
1011          Dmsg0(500, "my_dbi_batch_insert failed\n");
1012          goto bail_out;
1013       }
1014
1015       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1016
1017       return 1;
1018       break;
1019    case SQL_TYPE_SQLITE3:
1020       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1021       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1022       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1023                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1024                       mdb->esc_name, ar->attr, digest);
1025       if (my_dbi_query(mdb,mdb->cmd) == 1)
1026       {
1027          Dmsg0(500, "my_dbi_batch_insert failed\n");
1028          goto bail_out;
1029       }
1030
1031       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1032
1033       return 1;
1034       break;
1035    }
1036
1037 bail_out:
1038   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1039   mdb->status = (dbi_error_flag) 0;
1040   my_dbi_free_result(mdb);
1041   return mdb->status;
1042 }
1043
1044 /*
1045  * Escape strings so that PostgreSQL is happy on COPY
1046  *
1047  *   NOTE! len is the length of the old string. Your new
1048  *         string must be long enough (max 2*old+1) to hold
1049  *         the escaped output.
1050  */
1051 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1052 {
1053    /* we have to escape \t, \n, \r, \ */
1054    char c = '\0' ;
1055
1056    while (len > 0 && *src) {
1057       switch (*src) {
1058       case '\n':
1059          c = 'n';
1060          break;
1061       case '\\':
1062          c = '\\';
1063          break;
1064       case '\t':
1065          c = 't';
1066          break;
1067       case '\r':
1068          c = 'r';
1069          break;
1070       default:
1071          c = '\0' ;
1072       }
1073
1074       if (c) {
1075          *dest = '\\';
1076          dest++;
1077          *dest = c;
1078       } else {
1079          *dest = *src;
1080       }
1081
1082       len--;
1083       src++;
1084       dest++;
1085    }
1086
1087    *dest = '\0';
1088    return dest;
1089 }
1090
1091 #endif /* HAVE_BATCH_FILE_INSERT */
1092
1093 /* my_dbi_getisnull
1094  * like PQgetisnull
1095  * int PQgetisnull(const PGresult *res,
1096  *              int row_number,
1097  *               int column_number);
1098  *
1099  *  use dbi_result_seek_row to search in result set
1100  */
1101 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1102    int i;
1103
1104    if(row_number == 0) {
1105       row_number++;
1106    }
1107
1108    column_number++;
1109
1110    if(dbi_result_seek_row(result, row_number)) {
1111
1112       i = dbi_result_field_is_null_idx(result,column_number);
1113
1114       return i;
1115    } else {
1116
1117       return 0;
1118    }
1119
1120 }
1121 /* my_dbi_getvalue
1122  * like PQgetvalue;
1123  * char *PQgetvalue(const PGresult *res,
1124  *                int row_number,
1125  *                int column_number);
1126  *
1127  * use dbi_result_seek_row to search in result set
1128  * use example to return only strings
1129  */
1130 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1131
1132    char *buf = NULL;
1133    const char *errmsg;
1134    const char *field_name;
1135    unsigned short dbitype;
1136    size_t field_length;
1137    int64_t num;
1138
1139    /* correct the index for dbi interface
1140     * dbi index begins 1
1141     * I prefer do not change others functions
1142     */
1143    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1144                                 result, row_number, column_number);
1145
1146    column_number++;
1147
1148    if(row_number == 0) {
1149      row_number++;
1150    }
1151
1152    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1153                         result, row_number, column_number);
1154
1155    if(dbi_result_seek_row(result, row_number)) {
1156
1157       field_name = dbi_result_get_field_name(result, column_number);
1158       field_length = dbi_result_get_field_length(result, field_name);
1159       dbitype = dbi_result_get_field_type_idx(result,column_number);
1160
1161       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1162             "field_length bytes: '%d' fieldname: '%s'\n",
1163             dbitype, field_length, field_name);
1164
1165       if(field_length) {
1166          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1167          buf = (char *)malloc(field_length + 1);
1168       } else {
1169          /* if numbers */
1170          buf = (char *)malloc(sizeof(char *) * 50);
1171       }
1172
1173       switch (dbitype) {
1174       case DBI_TYPE_INTEGER:
1175          num = dbi_result_get_longlong(result, field_name);
1176          edit_int64(num, buf);
1177          field_length = strlen(buf);
1178          break;
1179       case DBI_TYPE_STRING:
1180          if(field_length) {
1181             field_length = bsnprintf(buf, field_length + 1, "%s",
1182             dbi_result_get_string(result, field_name));
1183          } else {
1184             buf[0] = 0;
1185          }
1186          break;
1187       case DBI_TYPE_BINARY:
1188          /* dbi_result_get_binary return a NULL pointer if value is empty
1189          * following, change this to what Bacula espected
1190          */
1191          if(field_length) {
1192             field_length = bsnprintf(buf, field_length + 1, "%s",
1193                   dbi_result_get_binary(result, field_name));
1194          } else {
1195             buf[0] = 0;
1196          }
1197          break;
1198       case DBI_TYPE_DATETIME:
1199          time_t last;
1200          struct tm tm;
1201
1202          last = dbi_result_get_datetime(result, field_name);
1203
1204          if(last == -1) {
1205                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1206          } else {
1207             (void)localtime_r(&last, &tm);
1208             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1209                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1210                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1211          }
1212          break;
1213       }
1214
1215    } else {
1216       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1217       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1218    }
1219
1220    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1221       buf, field_length, buf);
1222
1223    // don't worry about this buf
1224    return buf;
1225 }
1226
1227 static int my_dbi_sequence_last(B_DB *mdb, const char *table_name)
1228 {
1229    /*
1230     Obtain the current value of the sequence that
1231     provides the serial value for primary key of the table.
1232
1233     currval is local to our session.  It is not affected by
1234     other transactions.
1235
1236     Determine the name of the sequence.
1237     PostgreSQL automatically creates a sequence using
1238     <table>_<column>_seq.
1239     At the time of writing, all tables used this format for
1240     for their primary key: <table>id
1241     Except for basefiles which has a primary key on baseid.
1242     Therefore, we need to special case that one table.
1243
1244     everything else can use the PostgreSQL formula.
1245    */
1246
1247    char sequence[30];
1248    uint64_t id = 0;
1249
1250    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1251
1252       if (strcasecmp(table_name, "basefiles") == 0) {
1253          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1254       } else {
1255          bstrncpy(sequence, table_name, sizeof(sequence));
1256          bstrncat(sequence, "_",        sizeof(sequence));
1257          bstrncat(sequence, table_name, sizeof(sequence));
1258          bstrncat(sequence, "id",       sizeof(sequence));
1259       }
1260
1261       bstrncat(sequence, "_seq", sizeof(sequence));
1262       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1263    } else {
1264       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1265    }
1266
1267    return id;
1268 }
1269
1270 int my_dbi_sql_insert_id(B_DB *mdb, const char *query, const char *table_name)
1271 {
1272    /*
1273     * First execute the insert query and then retrieve the currval.
1274     */
1275    if (my_dbi_query(mdb, query)) {
1276       return 0;
1277    }
1278
1279    mdb->num_rows = sql_affected_rows(mdb);
1280    if (mdb->num_rows != 1) {
1281       return 0;
1282    }
1283
1284    mdb->changes++;
1285
1286    return my_dbi_sequence_last(mdb, table_name);
1287 }
1288
1289 #ifdef HAVE_BATCH_FILE_INSERT
1290 const char *my_dbi_batch_lock_path_query[4] = {
1291    /* Mysql */
1292    "LOCK TABLES Path write, batch write, Path as p write",
1293    /* Postgresql */
1294    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1295    /* SQLite */
1296    "BEGIN",
1297    /* SQLite3 */
1298    "BEGIN",
1299    /* Ingres */
1300    "BEGIN"
1301 };
1302
1303 const char *my_dbi_batch_lock_filename_query[4] = {
1304    /* Mysql */
1305    "LOCK TABLES Filename write, batch write, Filename as f write",
1306    /* Postgresql */
1307    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1308    /* SQLite */
1309    "BEGIN",
1310    /* SQLite3 */
1311    "BEGIN",
1312    /* Ingres */
1313    "BEGIN"
1314 };
1315
1316 const char *my_dbi_batch_unlock_tables_query[4] = {
1317    /* Mysql */
1318    "UNLOCK TABLES",
1319    /* Postgresql */
1320    "COMMIT",
1321    /* SQLite */
1322    "COMMIT",
1323    /* SQLite3 */
1324    "COMMIT"};
1325
1326 const char *my_dbi_batch_fill_path_query[4] = {
1327    /* Mysql */
1328    "INSERT INTO Path (Path) "
1329    "SELECT a.Path FROM "
1330    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1331    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1332    /* Postgresql */
1333    "INSERT INTO Path (Path) "
1334    "SELECT a.Path FROM "
1335    "(SELECT DISTINCT Path FROM batch) AS a "
1336    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1337    /* SQLite */
1338    "INSERT INTO Path (Path)"
1339    " SELECT DISTINCT Path FROM batch"
1340    " EXCEPT SELECT Path FROM Path",
1341    /* SQLite3 */
1342    "INSERT INTO Path (Path)"
1343    " SELECT DISTINCT Path FROM batch"
1344    " EXCEPT SELECT Path FROM Path",
1345    /* Ingres */
1346    "INSERT INTO Path (Path) "
1347    "SELECT a.Path FROM "
1348    "(SELECT DISTINCT Path FROM batch) AS a "
1349    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1350 };
1351
1352 const char *my_dbi_batch_fill_filename_query[4] = {
1353    /* Mysql */
1354    "INSERT INTO Filename (Name) "
1355    "SELECT a.Name FROM "
1356    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1357    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1358    /* Postgresql */
1359    "INSERT INTO Filename (Name) "
1360    "SELECT a.Name FROM "
1361    "(SELECT DISTINCT Name FROM batch) as a "
1362    "WHERE NOT EXISTS "
1363    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1364    /* SQLite */
1365    "INSERT INTO Filename (Name)"
1366    " SELECT DISTINCT Name FROM batch "
1367    " EXCEPT SELECT Name FROM Filename",
1368    /* SQLite3 */
1369    "INSERT INTO Filename (Name)"
1370    " SELECT DISTINCT Name FROM batch "
1371    " EXCEPT SELECT Name FROM Filename",
1372    /* Ingres */
1373    "INSERT INTO Filename (Name) "
1374    "SELECT a.Name FROM "
1375    "(SELECT DISTINCT Name FROM batch) as a "
1376    "WHERE NOT EXISTS "
1377    "(SELECT Name FROM Filename WHERE Name = a.Name)"
1378 };
1379
1380 #endif /* HAVE_BATCH_FILE_INSERT */
1381
1382 const char *my_dbi_match[4] = {
1383    /* Mysql */
1384    "MATCH",
1385    /* Postgresql */
1386    "~",
1387    /* SQLite */
1388    "MATCH",
1389    /* SQLite3 */
1390    "MATCH"
1391 };
1392
1393 #endif /* HAVE_DBI */