]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Lets call it COMMIT and not END as that seems to be the counterpart of BEGIN for...
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  */
37 /*
38  * This code only compiles against a recent version of libdbi. The current
39  * release found on the libdbi website (0.8.3) won't work for this code.
40  *
41  * You find the libdbi library on http://sourceforge.net/projects/libdbi
42  *
43  * A fairly recent version of libdbi from CVS works, so either make sure
44  * your distribution has a fairly recent version of libdbi installed or
45  * clone the CVS repositories from sourceforge and compile that code and
46  * install it.
47  *
48  * You need:
49  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
51  */
52
53
54 /* The following is necessary so that we do not include
55  * the dummy external definition of DB.
56  */
57 #define __SQL_C                       /* indicate that this is sql.c */
58
59 #include "bacula.h"
60 #include "cats.h"
61
62 #ifdef HAVE_DBI
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /* List of open databases */
72 static dlist *db_list = NULL;
73
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
76
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
78
79 /*
80  * Retrieve database type
81  */
82 const char *
83 db_get_type(void)
84 {
85    return "DBI";
86 }
87
88 /*
89  * Initialize database data structure. In principal this should
90  * never have errors, or it is really fatal.
91  */
92 B_DB *
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94                  const char *db_address, int db_port, const char *db_socket,
95                  int mult_db_connections)
96 {
97    B_DB *mdb = NULL;
98    DBI_FIELD_GET *field;
99    char db_driver[10];
100    char db_driverdir[256];
101
102    /* Constraint the db_driver */
103    if(db_type  == -1) {
104       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
105       return NULL;
106    }
107
108    /* Do the correct selection of driver.
109     * Can be one of the varius supported by libdbi
110     */
111    switch (db_type) {
112    case SQL_TYPE_MYSQL:
113       bstrncpy(db_driver,"mysql", sizeof(db_driver));
114       break;
115    case SQL_TYPE_POSTGRESQL:
116       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
117       break;
118    case SQL_TYPE_SQLITE:
119       bstrncpy(db_driver,"sqlite", sizeof(db_driver));
120       break;
121    case SQL_TYPE_SQLITE3:
122       bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
123       break;
124    }
125
126    /* Set db_driverdir whereis is the libdbi drivers */
127    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
128
129    if (!db_user) {
130       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
131       return NULL;
132    }
133    P(mutex);                          /* lock DB queue */
134    if (db_list == NULL) {
135       db_list = New(dlist(mdb, &mdb->link));
136       dbi_getvalue_list = New(dlist(field, &field->link));
137    }
138    if (!mult_db_connections) {
139       /* Look to see if DB already open */
140       foreach_dlist(mdb, db_list) {
141          if (bstrcmp(mdb->db_name, db_name) &&
142              bstrcmp(mdb->db_address, db_address) &&
143              bstrcmp(mdb->db_driver, db_driver) &&
144              mdb->db_port == db_port) {
145             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146                   dbi_conn_error(mdb->db, NULL));
147             mdb->ref_count++;
148             V(mutex);
149             return mdb;                  /* already open */
150          }
151       }
152    }
153    Dmsg0(100, "db_open first time\n");
154    mdb = (B_DB *)malloc(sizeof(B_DB));
155    memset(mdb, 0, sizeof(B_DB));
156    mdb->db_name = bstrdup(db_name);
157    mdb->db_user = bstrdup(db_user);
158    if (db_password) {
159       mdb->db_password = bstrdup(db_password);
160    }
161    if (db_address) {
162       mdb->db_address  = bstrdup(db_address);
163    }
164    if (db_socket) {
165       mdb->db_socket   = bstrdup(db_socket);
166    }
167    if (db_driverdir) {
168       mdb->db_driverdir = bstrdup(db_driverdir);
169    }
170    if (db_driver) {
171       mdb->db_driver    = bstrdup(db_driver);
172    }
173    mdb->db_type        = db_type;
174    mdb->db_port        = db_port;
175    mdb->have_insert_id = TRUE;
176    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
177    *mdb->errmsg        = 0;
178    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
179    mdb->cached_path    = get_pool_memory(PM_FNAME);
180    mdb->cached_path_id = 0;
181    mdb->ref_count      = 1;
182    mdb->fname          = get_pool_memory(PM_FNAME);
183    mdb->path           = get_pool_memory(PM_FNAME);
184    mdb->esc_name       = get_pool_memory(PM_FNAME);
185    mdb->esc_path      = get_pool_memory(PM_FNAME);
186    mdb->allow_transactions = mult_db_connections;
187    db_list->append(mdb);                   /* put db in list */
188    V(mutex);
189    return mdb;
190 }
191
192 /*
193  * Now actually open the database.  This can generate errors,
194  *   which are returned in the errmsg
195  *
196  * DO NOT close the database or free(mdb) here  !!!!
197  */
198 int
199 db_open_database(JCR *jcr, B_DB *mdb)
200 {
201    int errstat;
202    int dbstat;
203    uint8_t len;
204    const char *errmsg;
205    char buf[10], *port;
206    int numdrivers;
207    char *db_name = NULL;
208    char *db_dir = NULL;
209
210    P(mutex);
211    if (mdb->connected) {
212       V(mutex);
213       return 1;
214    }
215    mdb->connected = false;
216
217    if ((errstat=rwl_init(&mdb->lock)) != 0) {
218       berrno be;
219       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
220             be.bstrerror(errstat));
221       V(mutex);
222       return 0;
223    }
224
225    if (mdb->db_port) {
226       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
227       port = buf;
228    } else {
229       port = NULL;
230    }
231
232    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
233    if (numdrivers < 0) {
234       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
235                                "db_driverdir=%s. It is probaly not found any drivers\n"),
236                                mdb->db_driverdir,numdrivers);
237       V(mutex);
238       return 0;
239    }
240    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
241    /* Can be many types of databases */
242    switch (mdb->db_type) {
243    case SQL_TYPE_MYSQL:
244       dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
245       dbi_conn_set_option(mdb->db, "port", port);            /* default port */
246       dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
247       dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
248       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
249       break;
250    case SQL_TYPE_POSTGRESQL:
251       dbi_conn_set_option(mdb->db, "host", mdb->db_address);
252       dbi_conn_set_option(mdb->db, "port", port);
253       dbi_conn_set_option(mdb->db, "username", mdb->db_user);
254       dbi_conn_set_option(mdb->db, "password", mdb->db_password);
255       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
256       break;
257    case SQL_TYPE_SQLITE:
258       len = strlen(working_directory) + 5;
259       db_dir = (char *)malloc(len);
260       strcpy(db_dir, working_directory);
261       strcat(db_dir, "/");
262       len = strlen(mdb->db_name) + 5;
263       db_name = (char *)malloc(len);
264       strcpy(db_name, mdb->db_name);
265       strcat(db_name, ".db");
266       dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
267       dbi_conn_set_option(mdb->db, "dbname", db_name);
268       break;
269    case SQL_TYPE_SQLITE3:
270       len = strlen(working_directory) + 5;
271       db_dir = (char *)malloc(len);
272       strcpy(db_dir, working_directory);
273       strcat(db_dir, "/");
274       len = strlen(mdb->db_name) + 5;
275       db_name = (char *)malloc(len);
276       strcpy(db_name, mdb->db_name);
277       strcat(db_name, ".db");
278       dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
279       dbi_conn_set_option(mdb->db, "dbname", db_name);
280       Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
281       break;
282    }
283
284    /* If connection fails, try at 5 sec intervals for 30 seconds. */
285    for (int retry=0; retry < 6; retry++) {
286
287       dbstat = dbi_conn_connect(mdb->db);
288       if ( dbstat == 0) {
289          break;
290       }
291
292       dbi_conn_error(mdb->db, &errmsg);
293       Dmsg1(50, "dbi error: %s\n", errmsg);
294
295       bmicrosleep(5, 0);
296
297    }
298
299    if ( dbstat != 0 ) {
300       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
301          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
302          mdb->db_driver, mdb->db_name, mdb->db_user);
303       V(mutex);
304       return 0;
305    }
306
307    Dmsg0(50, "dbi_real_connect done\n");
308    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
309                     mdb->db_user, mdb->db_name,
310                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
311
312    mdb->connected = true;
313
314    if (!check_tables_version(jcr, mdb)) {
315       V(mutex);
316       return 0;
317    }
318
319    switch (mdb->db_type) {
320    case SQL_TYPE_MYSQL:
321       /* Set connection timeout to 8 days specialy for batch mode */
322       sql_query(mdb, "SET wait_timeout=691200");
323       sql_query(mdb, "SET interactive_timeout=691200");
324       break;
325    case SQL_TYPE_POSTGRESQL:
326       /* tell PostgreSQL we are using standard conforming strings
327          and avoid warnings such as:
328          WARNING:  nonstandard use of \\ in a string literal
329       */
330       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
331       sql_query(mdb, "set standard_conforming_strings=on");
332       break;
333    }
334
335    if(db_dir) {
336       free(db_dir);
337    }
338    if(db_name) {
339       free(db_name);
340    }
341
342    V(mutex);
343    return 1;
344 }
345
346 void
347 db_close_database(JCR *jcr, B_DB *mdb)
348 {
349    if (!mdb) {
350       return;
351    }
352    db_end_transaction(jcr, mdb);
353    P(mutex);
354    sql_free_result(mdb);
355    mdb->ref_count--;
356    if (mdb->ref_count == 0) {
357       db_list->remove(mdb);
358       if (mdb->connected && mdb->db) {
359          //sql_close(mdb);
360          dbi_shutdown_r(mdb->instance);
361          mdb->db = NULL;
362          mdb->instance = NULL;
363       }
364       rwl_destroy(&mdb->lock);
365       free_pool_memory(mdb->errmsg);
366       free_pool_memory(mdb->cmd);
367       free_pool_memory(mdb->cached_path);
368       free_pool_memory(mdb->fname);
369       free_pool_memory(mdb->path);
370       free_pool_memory(mdb->esc_name);
371       free_pool_memory(mdb->esc_path);
372       if (mdb->db_name) {
373          free(mdb->db_name);
374       }
375       if (mdb->db_user) {
376          free(mdb->db_user);
377       }
378       if (mdb->db_password) {
379          free(mdb->db_password);
380       }
381       if (mdb->db_address) {
382          free(mdb->db_address);
383       }
384       if (mdb->db_socket) {
385          free(mdb->db_socket);
386       }
387       if (mdb->db_driverdir) {
388          free(mdb->db_driverdir);
389       }
390       if (mdb->db_driver) {
391           free(mdb->db_driver);
392       }
393       free(mdb);
394       if (db_list->size() == 0) {
395          delete db_list;
396          db_list = NULL;
397       }
398    }
399    V(mutex);
400 }
401
402 void db_thread_cleanup()
403 { }
404
405 /*
406  * Return the next unique index (auto-increment) for
407  * the given table.  Return NULL on error.
408  *
409  */
410 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
411 {
412    strcpy(index, "NULL");
413    return 1;
414 }
415
416
417 /*
418  * Escape strings so that DBI is happy
419  *
420  *   NOTE! len is the length of the old string. Your new
421  *         string must be long enough (max 2*old+1) to hold
422  *         the escaped output.
423  *
424  * dbi_conn_quote_string_copy receives a pointer to pointer.
425  * We need copy the value of pointer to snew because libdbi change the
426  * pointer
427  */
428 void
429 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
430 {
431    char *inew;
432    char *pnew;
433
434    if (len == 0) {
435       snew[0] = 0;
436    } else {
437       /* correct the size of old basead in len
438        * and copy new string to inew
439        */
440       inew = (char *)malloc(sizeof(char) * len + 1);
441       bstrncpy(inew,old,len + 1);
442       /* escape the correct size of old */
443       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
444       free(inew);
445       /* copy the escaped string to snew */
446       bstrncpy(snew, pnew, 2 * len + 1);
447    }
448
449    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
450
451 }
452
453 /*
454  * Submit a general SQL command (cmd), and for each row returned,
455  *  the sqlite_handler is called with the ctx.
456  */
457 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
458 {
459    SQL_ROW row;
460
461    Dmsg0(500, "db_sql_query started\n");
462
463    db_lock(mdb);
464    if (sql_query(mdb, query) != 0) {
465       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
466       db_unlock(mdb);
467       Dmsg0(500, "db_sql_query failed\n");
468       return false;
469    }
470    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
471
472    if (result_handler != NULL) {
473       Dmsg0(500, "db_sql_query invoking handler\n");
474       if ((mdb->result = sql_store_result(mdb)) != NULL) {
475          int num_fields = sql_num_fields(mdb);
476
477          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
478          while ((row = sql_fetch_row(mdb)) != NULL) {
479
480             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
481             if (result_handler(ctx, num_fields, row))
482                break;
483          }
484
485         sql_free_result(mdb);
486       }
487    }
488    db_unlock(mdb);
489
490    Dmsg0(500, "db_sql_query finished\n");
491
492    return true;
493 }
494
495
496
497 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
498 {
499    int j;
500    DBI_ROW row = NULL; // by default, return NULL
501
502    Dmsg0(500, "my_dbi_fetch_row start\n");
503    if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
504       int num_fields = mdb->num_fields;
505       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
506
507       if (mdb->row) {
508          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
509          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
510          if (mdb->num_rows != 0) {
511             for(j = 0; j < mdb->num_fields; j++) {
512                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
513                   if(mdb->row[j]) {
514                      free(mdb->row[j]);
515                   }
516             }
517          }
518          free(mdb->row);
519       }
520       //num_fields += 20;                  /* add a bit extra */
521       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
522       mdb->row_size = num_fields;
523
524       // now reset the row_number now that we have the space allocated
525       mdb->row_number = 1;
526    }
527
528    // if still within the result set
529    if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
530       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
531       // get each value from this row
532       for (j = 0; j < mdb->num_fields; j++) {
533          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
534          // allocate space to queue row
535          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
536          // store the pointer in queue
537          mdb->field_get->value = mdb->row[j];
538          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
539                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
540          // insert in queue to future free
541          dbi_getvalue_list->append(mdb->field_get);
542       }
543       // increment the row number for the next call
544       mdb->row_number++;
545
546       row = mdb->row;
547    } else {
548       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
549    }
550
551    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
552
553    return row;
554 }
555
556 int my_dbi_max_length(B_DB *mdb, int field_num) {
557    //
558    // for a given column, find the max length
559    //
560    int max_length;
561    int i;
562    int this_length;
563    char *cbuf = NULL;
564
565    max_length = 0;
566    for (i = 0; i < mdb->num_rows; i++) {
567       if (my_dbi_getisnull(mdb->result, i, field_num)) {
568           this_length = 4;        // "NULL"
569       } else {
570          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
571          this_length = cstrlen(cbuf);
572          // cbuf is always free
573          free(cbuf);
574       }
575
576       if (max_length < this_length) {
577           max_length = this_length;
578       }
579    }
580
581    return max_length;
582 }
583
584 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
585 {
586    int     i;
587    int     dbi_index;
588
589    Dmsg0(500, "my_dbi_fetch_field starts\n");
590
591    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
592       if (mdb->fields) {
593          free(mdb->fields);
594       }
595       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
596       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
597       mdb->fields_size = mdb->num_fields;
598
599       for (i = 0; i < mdb->num_fields; i++) {
600          // num_fileds is starting at 1, increment i by 1
601          dbi_index = i + 1;
602          Dmsg1(500, "filling field %d\n", i);
603          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
604          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
605          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
606          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
607
608          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
609             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
610             mdb->fields[i].flags);
611       } // end for
612    } // end if
613
614    // increment field number for the next time around
615
616    Dmsg0(500, "my_dbi_fetch_field finishes\n");
617    return &mdb->fields[mdb->field_number++];
618 }
619
620 void my_dbi_data_seek(B_DB *mdb, int row)
621 {
622    // set the row number to be returned on the next call
623    // to my_dbi_fetch_row
624    mdb->row_number = row;
625 }
626
627 void my_dbi_field_seek(B_DB *mdb, int field)
628 {
629    mdb->field_number = field;
630 }
631
632 /*
633  * Note, if this routine returns 1 (failure), Bacula expects
634  *  that no result has been stored.
635  *
636  *  Returns:  0  on success
637  *            1  on failure
638  *
639  */
640 int my_dbi_query(B_DB *mdb, const char *query)
641 {
642    const char *errmsg;
643    Dmsg1(500, "my_dbi_query started %s\n", query);
644    // We are starting a new query.  reset everything.
645    mdb->num_rows     = -1;
646    mdb->row_number   = -1;
647    mdb->field_number = -1;
648
649    if (mdb->result) {
650       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
651       mdb->result = NULL;
652    }
653
654    mdb->result = (void **)dbi_conn_query(mdb->db, query);
655
656    if (!mdb->result) {
657       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
658       goto bail_out;
659    }
660
661    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
662
663    if (mdb->status == DBI_ERROR_NONE) {
664       Dmsg1(500, "we have a result\n", query);
665
666       // how many fields in the set?
667       // num_fields starting at 1
668       mdb->num_fields = dbi_result_get_numfields(mdb->result);
669       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
670       // if no result num_rows is 0
671       mdb->num_rows = dbi_result_get_numrows(mdb->result);
672       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
673
674       mdb->status = (dbi_error_flag) 0;                  /* succeed */
675    } else {
676       Dmsg1(50, "Result status failed: %s\n", query);
677       goto bail_out;
678    }
679
680    Dmsg0(500, "my_dbi_query finishing\n");
681    return mdb->status;
682
683 bail_out:
684    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
685    //dbi_conn_error(mdb->db, &errmsg);
686    Dmsg4(500, "my_dbi_query we failed dbi error: "
687                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
688    dbi_result_free(mdb->result);
689    mdb->result = NULL;
690    mdb->status = (dbi_error_flag) 1;                   /* failed */
691    return mdb->status;
692 }
693
694 void my_dbi_free_result(B_DB *mdb)
695 {
696
697    DBI_FIELD_GET *f;
698    db_lock(mdb);
699    if (mdb->result) {
700       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
701       dbi_result_free(mdb->result);
702    }
703
704    mdb->result = NULL;
705
706    if (mdb->row) {
707       free(mdb->row);
708    }
709
710    /* now is time to free all value return by my_dbi_get_value
711     * this is necessary because libdbi don't free memory return by yours results
712     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
713     *
714     * Using a queue to store all pointer allocate is a good way to free all things
715     * when necessary
716     */
717    foreach_dlist(f, dbi_getvalue_list) {
718       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
719       free(f->value);
720       free(f);
721    }
722
723    mdb->row = NULL;
724
725    if (mdb->fields) {
726       free(mdb->fields);
727       mdb->fields = NULL;
728    }
729    db_unlock(mdb);
730    Dmsg0(500, "my_dbi_free_result finish\n");
731
732 }
733
734 const char *my_dbi_strerror(B_DB *mdb)
735 {
736    const char *errmsg;
737
738    dbi_conn_error(mdb->db, &errmsg);
739
740    return errmsg;
741 }
742
743 #ifdef HAVE_BATCH_FILE_INSERT
744
745 /*
746  * This can be a bit strang but is the one way to do
747  *
748  * Returns 1 if OK
749  *         0 if failed
750  */
751 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
752 {
753    const char *query = "COPY batch FROM STDIN";
754
755    Dmsg0(500, "my_dbi_batch_start started\n");
756
757    switch (mdb->db_type) {
758    case SQL_TYPE_MYSQL:
759       db_lock(mdb);
760       if (my_dbi_query(mdb,
761                               "CREATE TEMPORARY TABLE batch ("
762                                   "FileIndex integer,"
763                                   "JobId integer,"
764                                   "Path blob,"
765                                   "Name blob,"
766                                   "LStat tinyblob,"
767                                   "MD5 tinyblob)") == 1)
768       {
769          Dmsg0(500, "my_dbi_batch_start failed\n");
770          return 1;
771       }
772       db_unlock(mdb);
773       Dmsg0(500, "my_dbi_batch_start finishing\n");
774       return 1;
775       break;
776    case SQL_TYPE_POSTGRESQL:
777
778       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
779                                   "fileindex int,"
780                                   "jobid int,"
781                                   "path varchar,"
782                                   "name varchar,"
783                                   "lstat varchar,"
784                                   "md5 varchar)") == 1)
785       {
786          Dmsg0(500, "my_dbi_batch_start failed\n");
787          return 1;
788       }
789
790       // We are starting a new query.  reset everything.
791       mdb->num_rows     = -1;
792       mdb->row_number   = -1;
793       mdb->field_number = -1;
794
795       my_dbi_free_result(mdb);
796
797       for (int i=0; i < 10; i++) {
798          my_dbi_query(mdb, query);
799          if (mdb->result) {
800             break;
801          }
802          bmicrosleep(5, 0);
803       }
804       if (!mdb->result) {
805          Dmsg1(50, "Query failed: %s\n", query);
806          goto bail_out;
807       }
808
809       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
810       //mdb->status = DBI_ERROR_NONE;
811
812       if (mdb->status == DBI_ERROR_NONE) {
813          // how many fields in the set?
814          mdb->num_fields = dbi_result_get_numfields(mdb->result);
815          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
816          mdb->status = (dbi_error_flag) 1;
817       } else {
818          Dmsg1(50, "Result status failed: %s\n", query);
819          goto bail_out;
820       }
821
822       Dmsg0(500, "my_postgresql_batch_start finishing\n");
823
824       return mdb->status;
825       break;
826    case SQL_TYPE_SQLITE:
827       db_lock(mdb);
828       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
829                                   "FileIndex integer,"
830                                   "JobId integer,"
831                                   "Path blob,"
832                                   "Name blob,"
833                                   "LStat tinyblob,"
834                                   "MD5 tinyblob)") == 1)
835       {
836          Dmsg0(500, "my_dbi_batch_start failed\n");
837          goto bail_out;
838       }
839       db_unlock(mdb);
840       Dmsg0(500, "my_dbi_batch_start finishing\n");
841       return 1;
842       break;
843    case SQL_TYPE_SQLITE3:
844       db_lock(mdb);
845       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
846                                   "FileIndex integer,"
847                                   "JobId integer,"
848                                   "Path blob,"
849                                   "Name blob,"
850                                   "LStat tinyblob,"
851                                   "MD5 tinyblob)") == 1)
852       {
853          Dmsg0(500, "my_dbi_batch_start failed\n");
854          goto bail_out;
855       }
856       db_unlock(mdb);
857       Dmsg0(500, "my_dbi_batch_start finishing\n");
858       return 1;
859       break;
860    }
861
862 bail_out:
863    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
864    mdb->status = (dbi_error_flag) 0;
865    my_dbi_free_result(mdb);
866    mdb->result = NULL;
867    return mdb->status;
868 }
869
870 /* set error to something to abort operation */
871 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
872 {
873    int res = 0;
874    int count = 30;
875    int (*custom_function)(void*, const char*) = NULL;
876    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
877
878    Dmsg0(500, "my_dbi_batch_end started\n");
879
880    if (!mdb) {                  /* no files ? */
881       return 0;
882    }
883
884    switch (mdb->db_type) {
885    case SQL_TYPE_MYSQL:
886       if(mdb) {
887          mdb->status = (dbi_error_flag) 0;
888       }
889       break;
890    case SQL_TYPE_POSTGRESQL:
891       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
892
893
894       do {
895          res = (*custom_function)(myconn->connection, error);
896       } while (res == 0 && --count > 0);
897
898       if (res == 1) {
899          Dmsg0(500, "ok\n");
900          mdb->status = (dbi_error_flag) 1;
901       }
902
903       if (res <= 0) {
904          Dmsg0(500, "we failed\n");
905          mdb->status = (dbi_error_flag) 0;
906          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
907        }
908       break;
909    case SQL_TYPE_SQLITE:
910       if(mdb) {
911          mdb->status = (dbi_error_flag) 0;
912       }
913       break;
914    case SQL_TYPE_SQLITE3:
915       if(mdb) {
916          mdb->status = (dbi_error_flag) 0;
917       }
918       break;
919    }
920
921    Dmsg0(500, "my_dbi_batch_end finishing\n");
922
923    return true;
924 }
925
926 /*
927  * This function is big and use a big switch.
928  * In near future is better split in small functions
929  * and refactory.
930  *
931  */
932 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
933 {
934    int res;
935    int count=30;
936    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
937    int (*custom_function)(void*, const char*, int) = NULL;
938    char* (*custom_function_error)(void*) = NULL;
939    size_t len;
940    char *digest;
941    char ed1[50];
942
943    Dmsg0(500, "my_dbi_batch_insert started \n");
944
945    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
946    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
947
948    if (ar->Digest == NULL || ar->Digest[0] == 0) {
949       *digest = '\0';
950    } else {
951       digest = ar->Digest;
952    }
953
954    switch (mdb->db_type) {
955    case SQL_TYPE_MYSQL:
956       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
957       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
958       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
959                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
960                       mdb->esc_name, ar->attr, digest);
961
962       if (my_dbi_query(mdb,mdb->cmd) == 1)
963       {
964          Dmsg0(500, "my_dbi_batch_insert failed\n");
965          goto bail_out;
966       }
967
968       Dmsg0(500, "my_dbi_batch_insert finishing\n");
969
970       return 1;
971       break;
972    case SQL_TYPE_POSTGRESQL:
973       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
974       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
975       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
976                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
977                      mdb->esc_name, ar->attr, digest);
978
979       /* libdbi don't support CopyData and we need call a postgresql
980        * specific function to do this work
981        */
982       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
983       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
984             "PQputCopyData")) != NULL) {
985          do {
986             res = (*custom_function)(myconn->connection, mdb->cmd, len);
987          } while (res == 0 && --count > 0);
988
989          if (res == 1) {
990             Dmsg0(500, "ok\n");
991             mdb->changes++;
992             mdb->status = (dbi_error_flag) 1;
993          }
994
995          if (res <= 0) {
996             Dmsg0(500, "my_dbi_batch_insert failed\n");
997             goto bail_out;
998          }
999
1000          Dmsg0(500, "my_dbi_batch_insert finishing\n");
1001          return mdb->status;
1002       } else {
1003          // ensure to detect a PQerror
1004          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1005          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1006          goto bail_out;
1007       }
1008       break;
1009    case SQL_TYPE_SQLITE:
1010       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1011       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1012       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1013                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1014                       mdb->esc_name, ar->attr, digest);
1015       if (my_dbi_query(mdb,mdb->cmd) == 1)
1016       {
1017          Dmsg0(500, "my_dbi_batch_insert failed\n");
1018          goto bail_out;
1019       }
1020
1021       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1022
1023       return 1;
1024       break;
1025    case SQL_TYPE_SQLITE3:
1026       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1027       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1028       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1029                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1030                       mdb->esc_name, ar->attr, digest);
1031       if (my_dbi_query(mdb,mdb->cmd) == 1)
1032       {
1033          Dmsg0(500, "my_dbi_batch_insert failed\n");
1034          goto bail_out;
1035       }
1036
1037       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1038
1039       return 1;
1040       break;
1041    }
1042
1043 bail_out:
1044   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1045   mdb->status = (dbi_error_flag) 0;
1046   my_dbi_free_result(mdb);
1047   return mdb->status;
1048 }
1049
1050 /*
1051  * Escape strings so that PostgreSQL is happy on COPY
1052  *
1053  *   NOTE! len is the length of the old string. Your new
1054  *         string must be long enough (max 2*old+1) to hold
1055  *         the escaped output.
1056  */
1057 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1058 {
1059    /* we have to escape \t, \n, \r, \ */
1060    char c = '\0' ;
1061
1062    while (len > 0 && *src) {
1063       switch (*src) {
1064       case '\n':
1065          c = 'n';
1066          break;
1067       case '\\':
1068          c = '\\';
1069          break;
1070       case '\t':
1071          c = 't';
1072          break;
1073       case '\r':
1074          c = 'r';
1075          break;
1076       default:
1077          c = '\0' ;
1078       }
1079
1080       if (c) {
1081          *dest = '\\';
1082          dest++;
1083          *dest = c;
1084       } else {
1085          *dest = *src;
1086       }
1087
1088       len--;
1089       src++;
1090       dest++;
1091    }
1092
1093    *dest = '\0';
1094    return dest;
1095 }
1096
1097 #endif /* HAVE_BATCH_FILE_INSERT */
1098
1099 /* my_dbi_getisnull
1100  * like PQgetisnull
1101  * int PQgetisnull(const PGresult *res,
1102  *              int row_number,
1103  *               int column_number);
1104  *
1105  *  use dbi_result_seek_row to search in result set
1106  */
1107 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1108    int i;
1109
1110    if(row_number == 0) {
1111       row_number++;
1112    }
1113
1114    column_number++;
1115
1116    if(dbi_result_seek_row(result, row_number)) {
1117
1118       i = dbi_result_field_is_null_idx(result,column_number);
1119
1120       return i;
1121    } else {
1122
1123       return 0;
1124    }
1125
1126 }
1127 /* my_dbi_getvalue
1128  * like PQgetvalue;
1129  * char *PQgetvalue(const PGresult *res,
1130  *                int row_number,
1131  *                int column_number);
1132  *
1133  * use dbi_result_seek_row to search in result set
1134  * use example to return only strings
1135  */
1136 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1137
1138    char *buf = NULL;
1139    const char *errmsg;
1140    const char *field_name;
1141    unsigned short dbitype;
1142    size_t field_length;
1143    int64_t num;
1144
1145    /* correct the index for dbi interface
1146     * dbi index begins 1
1147     * I prefer do not change others functions
1148     */
1149    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1150                                 result, row_number, column_number);
1151
1152    column_number++;
1153
1154    if(row_number == 0) {
1155      row_number++;
1156    }
1157
1158    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1159                         result, row_number, column_number);
1160
1161    if(dbi_result_seek_row(result, row_number)) {
1162
1163       field_name = dbi_result_get_field_name(result, column_number);
1164       field_length = dbi_result_get_field_length(result, field_name);
1165       dbitype = dbi_result_get_field_type_idx(result,column_number);
1166
1167       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1168             "field_length bytes: '%d' fieldname: '%s'\n",
1169             dbitype, field_length, field_name);
1170
1171       if(field_length) {
1172          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1173          buf = (char *)malloc(field_length + 1);
1174       } else {
1175          /* if numbers */
1176          buf = (char *)malloc(sizeof(char *) * 50);
1177       }
1178
1179       switch (dbitype) {
1180       case DBI_TYPE_INTEGER:
1181          num = dbi_result_get_longlong(result, field_name);
1182          edit_int64(num, buf);
1183          field_length = strlen(buf);
1184          break;
1185       case DBI_TYPE_STRING:
1186          if(field_length) {
1187             field_length = bsnprintf(buf, field_length + 1, "%s",
1188             dbi_result_get_string(result, field_name));
1189          } else {
1190             buf[0] = 0;
1191          }
1192          break;
1193       case DBI_TYPE_BINARY:
1194          /* dbi_result_get_binary return a NULL pointer if value is empty
1195          * following, change this to what Bacula espected
1196          */
1197          if(field_length) {
1198             field_length = bsnprintf(buf, field_length + 1, "%s",
1199                   dbi_result_get_binary(result, field_name));
1200          } else {
1201             buf[0] = 0;
1202          }
1203          break;
1204       case DBI_TYPE_DATETIME:
1205          time_t last;
1206          struct tm tm;
1207
1208          last = dbi_result_get_datetime(result, field_name);
1209
1210          if(last == -1) {
1211                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1212          } else {
1213             (void)localtime_r(&last, &tm);
1214             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1215                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1216                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1217          }
1218          break;
1219       }
1220
1221    } else {
1222       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1223       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1224    }
1225
1226    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1227       buf, field_length, buf);
1228
1229    // don't worry about this buf
1230    return buf;
1231 }
1232
1233 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
1234 {
1235    /*
1236     Obtain the current value of the sequence that
1237     provides the serial value for primary key of the table.
1238
1239     currval is local to our session.  It is not affected by
1240     other transactions.
1241
1242     Determine the name of the sequence.
1243     PostgreSQL automatically creates a sequence using
1244     <table>_<column>_seq.
1245     At the time of writing, all tables used this format for
1246     for their primary key: <table>id
1247     Except for basefiles which has a primary key on baseid.
1248     Therefore, we need to special case that one table.
1249
1250     everything else can use the PostgreSQL formula.
1251    */
1252
1253    char      sequence[30];
1254    uint64_t    id = 0;
1255
1256    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1257
1258       if (strcasecmp(table_name, "basefiles") == 0) {
1259          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1260       } else {
1261          bstrncpy(sequence, table_name, sizeof(sequence));
1262          bstrncat(sequence, "_",        sizeof(sequence));
1263          bstrncat(sequence, table_name, sizeof(sequence));
1264          bstrncat(sequence, "id",       sizeof(sequence));
1265       }
1266
1267       bstrncat(sequence, "_seq", sizeof(sequence));
1268       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1269    } else {
1270       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1271    }
1272
1273    return id;
1274 }
1275
1276 #ifdef HAVE_BATCH_FILE_INSERT
1277 const char *my_dbi_batch_lock_path_query[5] = {
1278    /* Mysql */
1279    "LOCK TABLES Path write, batch write, Path as p write",
1280    /* Postgresql */
1281    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1282    /* SQLite */
1283    "BEGIN",
1284    /* SQLite3 */
1285    "BEGIN",
1286    /* Ingres */
1287    "BEGIN"
1288 };
1289
1290 const char *my_dbi_batch_lock_filename_query[5] = {
1291    /* Mysql */
1292    "LOCK TABLES Filename write, batch write, Filename as f write",
1293    /* Postgresql */
1294    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1295    /* SQLite */
1296    "BEGIN",
1297    /* SQLite3 */
1298    "BEGIN",
1299    /* Ingres */
1300    "BEGIN"
1301 };
1302
1303 const char *my_dbi_batch_unlock_tables_query[5] = {
1304    /* Mysql */
1305    "UNLOCK TABLES",
1306    /* Postgresql */
1307    "COMMIT",
1308    /* SQLite */
1309    "COMMIT",
1310    /* SQLite3 */
1311    "COMMIT",
1312    /* Ingres */
1313    "COMMIT"
1314 };
1315
1316 const char *my_dbi_batch_fill_path_query[5] = {
1317    /* Mysql */
1318    "INSERT INTO Path (Path) "
1319    "SELECT a.Path FROM "
1320    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1321    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1322    /* Postgresql */
1323    "INSERT INTO Path (Path) "
1324    "SELECT a.Path FROM "
1325    "(SELECT DISTINCT Path FROM batch) AS a "
1326    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1327    /* SQLite */
1328    "INSERT INTO Path (Path)"
1329    " SELECT DISTINCT Path FROM batch"
1330    " EXCEPT SELECT Path FROM Path",
1331    /* SQLite3 */
1332    "INSERT INTO Path (Path)"
1333    " SELECT DISTINCT Path FROM batch"
1334    " EXCEPT SELECT Path FROM Path",
1335    /* Ingres */
1336    "INSERT INTO Path (Path) "
1337    "SELECT a.Path FROM "
1338    "(SELECT DISTINCT Path FROM batch) AS a "
1339    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1340 };
1341
1342 const char *my_dbi_batch_fill_filename_query[5] = {
1343    /* Mysql */
1344    "INSERT INTO Filename (Name) "
1345    "SELECT a.Name FROM "
1346    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1347    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1348    /* Postgresql */
1349    "INSERT INTO Filename (Name) "
1350    "SELECT a.Name FROM "
1351    "(SELECT DISTINCT Name FROM batch) as a "
1352    "WHERE NOT EXISTS "
1353    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1354    /* SQLite */
1355    "INSERT INTO Filename (Name)"
1356    " SELECT DISTINCT Name FROM batch "
1357    " EXCEPT SELECT Name FROM Filename",
1358    /* SQLite3 */
1359    "INSERT INTO Filename (Name)"
1360    " SELECT DISTINCT Name FROM batch "
1361    " EXCEPT SELECT Name FROM Filename",
1362    /* Ingres */
1363    "INSERT INTO Filename (Name) "
1364    "SELECT a.Name FROM "
1365    "(SELECT DISTINCT Name FROM batch) as a "
1366    "WHERE NOT EXISTS "
1367    "(SELECT Name FROM Filename WHERE Name = a.Name)"
1368 };
1369
1370 #endif /* HAVE_BATCH_FILE_INSERT */
1371
1372 const char *my_dbi_match[5] = {
1373    /* Mysql */
1374    "MATCH",
1375    /* Postgresql */
1376    "~",
1377    /* SQLite */
1378    "MATCH",
1379    /* SQLite3 */
1380    "MATCH",
1381    /* Ingres */
1382    "~"
1383 };
1384
1385 #endif /* HAVE_DBI */