]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Fix stupid inverted logic
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  */
37 /*
38  * This code only compiles against a recent version of libdbi. The current
39  * release found on the libdbi website (0.8.3) won't work for this code.
40  *
41  * You find the libdbi library on http://sourceforge.net/projects/libdbi
42  *
43  * A fairly recent version of libdbi from CVS works, so either make sure
44  * your distribution has a fairly recent version of libdbi installed or
45  * clone the CVS repositories from sourceforge and compile that code and
46  * install it.
47  *
48  * You need:
49  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
51  */
52
53
54 /* The following is necessary so that we do not include
55  * the dummy external definition of DB.
56  */
57 #define __SQL_C                       /* indicate that this is sql.c */
58
59 #include "bacula.h"
60 #include "cats.h"
61
62 #ifdef HAVE_DBI
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /* List of open databases */
72 static dlist *db_list = NULL;
73
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
76
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
78
79 /*
80  * Retrieve database type
81  */
82 const char *
83 db_get_type(void)
84 {
85    return "DBI";
86 }
87
88 /*
89  * Initialize database data structure. In principal this should
90  * never have errors, or it is really fatal.
91  */
92 B_DB *
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94                  const char *db_address, int db_port, const char *db_socket,
95                  int mult_db_connections)
96 {
97    B_DB *mdb = NULL;
98    DBI_FIELD_GET *field;
99    char db_driver[10];
100    char db_driverdir[256];
101
102    /* Constraint the db_driver */
103    if(db_type  == -1) {
104       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
105       return NULL;
106    }
107
108    /* Do the correct selection of driver.
109     * Can be one of the varius supported by libdbi
110     */
111    switch (db_type) {
112    case SQL_TYPE_MYSQL:
113       bstrncpy(db_driver,"mysql", sizeof(db_driver));
114       break;
115    case SQL_TYPE_POSTGRESQL:
116       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
117       break;
118    case SQL_TYPE_SQLITE:
119       bstrncpy(db_driver,"sqlite", sizeof(db_driver));
120       break;
121    case SQL_TYPE_SQLITE3:
122       bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
123       break;
124    }
125
126    /* Set db_driverdir whereis is the libdbi drivers */
127    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
128
129    if (!db_user) {
130       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
131       return NULL;
132    }
133    P(mutex);                          /* lock DB queue */
134    if (db_list == NULL) {
135       db_list = New(dlist(mdb, &mdb->link));
136       dbi_getvalue_list = New(dlist(field, &field->link));
137    }
138    if (!mult_db_connections) {
139       /* Look to see if DB already open */
140       foreach_dlist(mdb, db_list) {
141          if (bstrcmp(mdb->db_name, db_name) &&
142              bstrcmp(mdb->db_address, db_address) &&
143              bstrcmp(mdb->db_driver, db_driver) &&
144              mdb->db_port == db_port) {
145             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146                   dbi_conn_error(mdb->db, NULL));
147             mdb->ref_count++;
148             V(mutex);
149             return mdb;                  /* already open */
150          }
151       }
152    }
153    Dmsg0(100, "db_open first time\n");
154    mdb = (B_DB *)malloc(sizeof(B_DB));
155    memset(mdb, 0, sizeof(B_DB));
156    mdb->db_name = bstrdup(db_name);
157    mdb->db_user = bstrdup(db_user);
158    if (db_password) {
159       mdb->db_password = bstrdup(db_password);
160    }
161    if (db_address) {
162       mdb->db_address  = bstrdup(db_address);
163    }
164    if (db_socket) {
165       mdb->db_socket   = bstrdup(db_socket);
166    }
167    if (db_driverdir) {
168       mdb->db_driverdir = bstrdup(db_driverdir);
169    }
170    if (db_driver) {
171       mdb->db_driver    = bstrdup(db_driver);
172    }
173    mdb->db_type        = db_type;
174    mdb->db_port        = db_port;
175    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
176    *mdb->errmsg        = 0;
177    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
178    mdb->cached_path    = get_pool_memory(PM_FNAME);
179    mdb->cached_path_id = 0;
180    mdb->ref_count      = 1;
181    mdb->fname          = get_pool_memory(PM_FNAME);
182    mdb->path           = get_pool_memory(PM_FNAME);
183    mdb->esc_name       = get_pool_memory(PM_FNAME);
184    mdb->esc_path      = get_pool_memory(PM_FNAME);
185    mdb->allow_transactions = mult_db_connections;
186    db_list->append(mdb);                   /* put db in list */
187    V(mutex);
188    return mdb;
189 }
190
191 /*
192  * Now actually open the database.  This can generate errors,
193  *   which are returned in the errmsg
194  *
195  * DO NOT close the database or free(mdb) here  !!!!
196  */
197 int
198 db_open_database(JCR *jcr, B_DB *mdb)
199 {
200    int errstat;
201    int dbstat;
202    uint8_t len;
203    const char *errmsg;
204    char buf[10], *port;
205    int numdrivers;
206    char *db_name = NULL;
207    char *db_dir = NULL;
208
209    P(mutex);
210    if (mdb->connected) {
211       V(mutex);
212       return 1;
213    }
214    mdb->connected = false;
215
216    if ((errstat=rwl_init(&mdb->lock)) != 0) {
217       berrno be;
218       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
219             be.bstrerror(errstat));
220       V(mutex);
221       return 0;
222    }
223
224    if (mdb->db_port) {
225       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
226       port = buf;
227    } else {
228       port = NULL;
229    }
230
231    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
232    if (numdrivers < 0) {
233       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
234                                "db_driverdir=%s. It is probaly not found any drivers\n"),
235                                mdb->db_driverdir,numdrivers);
236       V(mutex);
237       return 0;
238    }
239    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
240    /* Can be many types of databases */
241    switch (mdb->db_type) {
242    case SQL_TYPE_MYSQL:
243       dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
244       dbi_conn_set_option(mdb->db, "port", port);            /* default port */
245       dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
246       dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
247       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
248       break;
249    case SQL_TYPE_POSTGRESQL:
250       dbi_conn_set_option(mdb->db, "host", mdb->db_address);
251       dbi_conn_set_option(mdb->db, "port", port);
252       dbi_conn_set_option(mdb->db, "username", mdb->db_user);
253       dbi_conn_set_option(mdb->db, "password", mdb->db_password);
254       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
255       break;
256    case SQL_TYPE_SQLITE:
257       len = strlen(working_directory) + 5;
258       db_dir = (char *)malloc(len);
259       strcpy(db_dir, working_directory);
260       strcat(db_dir, "/");
261       len = strlen(mdb->db_name) + 5;
262       db_name = (char *)malloc(len);
263       strcpy(db_name, mdb->db_name);
264       strcat(db_name, ".db");
265       dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
266       dbi_conn_set_option(mdb->db, "dbname", db_name);
267       break;
268    case SQL_TYPE_SQLITE3:
269       len = strlen(working_directory) + 5;
270       db_dir = (char *)malloc(len);
271       strcpy(db_dir, working_directory);
272       strcat(db_dir, "/");
273       len = strlen(mdb->db_name) + 5;
274       db_name = (char *)malloc(len);
275       strcpy(db_name, mdb->db_name);
276       strcat(db_name, ".db");
277       dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
278       dbi_conn_set_option(mdb->db, "dbname", db_name);
279       Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
280       break;
281    }
282
283    /* If connection fails, try at 5 sec intervals for 30 seconds. */
284    for (int retry=0; retry < 6; retry++) {
285
286       dbstat = dbi_conn_connect(mdb->db);
287       if ( dbstat == 0) {
288          break;
289       }
290
291       dbi_conn_error(mdb->db, &errmsg);
292       Dmsg1(50, "dbi error: %s\n", errmsg);
293
294       bmicrosleep(5, 0);
295
296    }
297
298    if ( dbstat != 0 ) {
299       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301          mdb->db_driver, mdb->db_name, mdb->db_user);
302       V(mutex);
303       return 0;
304    }
305
306    Dmsg0(50, "dbi_real_connect done\n");
307    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
308                     mdb->db_user, mdb->db_name,
309                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
310
311    mdb->connected = true;
312
313    if (!check_tables_version(jcr, mdb)) {
314       V(mutex);
315       return 0;
316    }
317
318    switch (mdb->db_type) {
319    case SQL_TYPE_MYSQL:
320       /* Set connection timeout to 8 days specialy for batch mode */
321       sql_query(mdb, "SET wait_timeout=691200");
322       sql_query(mdb, "SET interactive_timeout=691200");
323       break;
324    case SQL_TYPE_POSTGRESQL:
325       /* tell PostgreSQL we are using standard conforming strings
326          and avoid warnings such as:
327          WARNING:  nonstandard use of \\ in a string literal
328       */
329       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
330       sql_query(mdb, "set standard_conforming_strings=on");
331       break;
332    }
333
334    if(db_dir) {
335       free(db_dir);
336    }
337    if(db_name) {
338       free(db_name);
339    }
340
341    V(mutex);
342    return 1;
343 }
344
345 void
346 db_close_database(JCR *jcr, B_DB *mdb)
347 {
348    if (!mdb) {
349       return;
350    }
351    db_end_transaction(jcr, mdb);
352    P(mutex);
353    sql_free_result(mdb);
354    mdb->ref_count--;
355    if (mdb->ref_count == 0) {
356       db_list->remove(mdb);
357       if (mdb->connected && mdb->db) {
358          //sql_close(mdb);
359          dbi_shutdown_r(mdb->instance);
360          mdb->db = NULL;
361          mdb->instance = NULL;
362       }
363       rwl_destroy(&mdb->lock);
364       free_pool_memory(mdb->errmsg);
365       free_pool_memory(mdb->cmd);
366       free_pool_memory(mdb->cached_path);
367       free_pool_memory(mdb->fname);
368       free_pool_memory(mdb->path);
369       free_pool_memory(mdb->esc_name);
370       free_pool_memory(mdb->esc_path);
371       if (mdb->db_name) {
372          free(mdb->db_name);
373       }
374       if (mdb->db_user) {
375          free(mdb->db_user);
376       }
377       if (mdb->db_password) {
378          free(mdb->db_password);
379       }
380       if (mdb->db_address) {
381          free(mdb->db_address);
382       }
383       if (mdb->db_socket) {
384          free(mdb->db_socket);
385       }
386       if (mdb->db_driverdir) {
387          free(mdb->db_driverdir);
388       }
389       if (mdb->db_driver) {
390           free(mdb->db_driver);
391       }
392       free(mdb);
393       if (db_list->size() == 0) {
394          delete db_list;
395          db_list = NULL;
396       }
397    }
398    V(mutex);
399 }
400
401 void db_check_backend_thread_safe()
402 { }
403
404 void db_thread_cleanup()
405 { }
406
407 /*
408  * Return the next unique index (auto-increment) for
409  * the given table.  Return NULL on error.
410  *
411  */
412 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
413 {
414    strcpy(index, "NULL");
415    return 1;
416 }
417
418
419 /*
420  * Escape strings so that DBI is happy
421  *
422  *   NOTE! len is the length of the old string. Your new
423  *         string must be long enough (max 2*old+1) to hold
424  *         the escaped output.
425  *
426  * dbi_conn_quote_string_copy receives a pointer to pointer.
427  * We need copy the value of pointer to snew because libdbi change the
428  * pointer
429  */
430 void
431 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
432 {
433    char *inew;
434    char *pnew;
435
436    if (len == 0) {
437       snew[0] = 0;
438    } else {
439       /* correct the size of old basead in len
440        * and copy new string to inew
441        */
442       inew = (char *)malloc(sizeof(char) * len + 1);
443       bstrncpy(inew,old,len + 1);
444       /* escape the correct size of old */
445       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
446       free(inew);
447       /* copy the escaped string to snew */
448       bstrncpy(snew, pnew, 2 * len + 1);
449    }
450
451    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
452
453 }
454
455 /*
456  * Submit a general SQL command (cmd), and for each row returned,
457  *  the sqlite_handler is called with the ctx.
458  */
459 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
460 {
461    SQL_ROW row;
462
463    Dmsg0(500, "db_sql_query started\n");
464
465    db_lock(mdb);
466    if (sql_query(mdb, query) != 0) {
467       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
468       db_unlock(mdb);
469       Dmsg0(500, "db_sql_query failed\n");
470       return false;
471    }
472    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
473
474    if (result_handler != NULL) {
475       Dmsg0(500, "db_sql_query invoking handler\n");
476       if ((mdb->result = sql_store_result(mdb)) != NULL) {
477          int num_fields = sql_num_fields(mdb);
478
479          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
480          while ((row = sql_fetch_row(mdb)) != NULL) {
481
482             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
483             if (result_handler(ctx, num_fields, row))
484                break;
485          }
486
487         sql_free_result(mdb);
488       }
489    }
490    db_unlock(mdb);
491
492    Dmsg0(500, "db_sql_query finished\n");
493
494    return true;
495 }
496
497
498
499 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
500 {
501    int j;
502    DBI_ROW row = NULL; // by default, return NULL
503
504    Dmsg0(500, "my_dbi_fetch_row start\n");
505    if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
506       int num_fields = mdb->num_fields;
507       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
508
509       if (mdb->row) {
510          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
511          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
512          if (mdb->num_rows != 0) {
513             for(j = 0; j < mdb->num_fields; j++) {
514                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
515                   if(mdb->row[j]) {
516                      free(mdb->row[j]);
517                   }
518             }
519          }
520          free(mdb->row);
521       }
522       //num_fields += 20;                  /* add a bit extra */
523       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
524       mdb->row_size = num_fields;
525
526       // now reset the row_number now that we have the space allocated
527       mdb->row_number = 1;
528    }
529
530    // if still within the result set
531    if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
532       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
533       // get each value from this row
534       for (j = 0; j < mdb->num_fields; j++) {
535          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
536          // allocate space to queue row
537          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
538          // store the pointer in queue
539          mdb->field_get->value = mdb->row[j];
540          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
541                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
542          // insert in queue to future free
543          dbi_getvalue_list->append(mdb->field_get);
544       }
545       // increment the row number for the next call
546       mdb->row_number++;
547
548       row = mdb->row;
549    } else {
550       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
551    }
552
553    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
554
555    return row;
556 }
557
558 int my_dbi_max_length(B_DB *mdb, int field_num) {
559    //
560    // for a given column, find the max length
561    //
562    int max_length;
563    int i;
564    int this_length;
565    char *cbuf = NULL;
566
567    max_length = 0;
568    for (i = 0; i < mdb->num_rows; i++) {
569       if (my_dbi_getisnull(mdb->result, i, field_num)) {
570           this_length = 4;        // "NULL"
571       } else {
572          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
573          this_length = cstrlen(cbuf);
574          // cbuf is always free
575          free(cbuf);
576       }
577
578       if (max_length < this_length) {
579           max_length = this_length;
580       }
581    }
582
583    return max_length;
584 }
585
586 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
587 {
588    int     i;
589    int     dbi_index;
590
591    Dmsg0(500, "my_dbi_fetch_field starts\n");
592
593    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
594       if (mdb->fields) {
595          free(mdb->fields);
596       }
597       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
598       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
599       mdb->fields_size = mdb->num_fields;
600
601       for (i = 0; i < mdb->num_fields; i++) {
602          // num_fileds is starting at 1, increment i by 1
603          dbi_index = i + 1;
604          Dmsg1(500, "filling field %d\n", i);
605          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
606          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
607          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
608          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
609
610          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
611             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
612             mdb->fields[i].flags);
613       } // end for
614    } // end if
615
616    // increment field number for the next time around
617
618    Dmsg0(500, "my_dbi_fetch_field finishes\n");
619    return &mdb->fields[mdb->field_number++];
620 }
621
622 void my_dbi_data_seek(B_DB *mdb, int row)
623 {
624    // set the row number to be returned on the next call
625    // to my_dbi_fetch_row
626    mdb->row_number = row;
627 }
628
629 void my_dbi_field_seek(B_DB *mdb, int field)
630 {
631    mdb->field_number = field;
632 }
633
634 /*
635  * Note, if this routine returns 1 (failure), Bacula expects
636  *  that no result has been stored.
637  *
638  *  Returns:  0  on success
639  *            1  on failure
640  *
641  */
642 int my_dbi_query(B_DB *mdb, const char *query)
643 {
644    const char *errmsg;
645    Dmsg1(500, "my_dbi_query started %s\n", query);
646    // We are starting a new query.  reset everything.
647    mdb->num_rows     = -1;
648    mdb->row_number   = -1;
649    mdb->field_number = -1;
650
651    if (mdb->result) {
652       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
653       mdb->result = NULL;
654    }
655
656    mdb->result = (void **)dbi_conn_query(mdb->db, query);
657
658    if (!mdb->result) {
659       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
660       goto bail_out;
661    }
662
663    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
664
665    if (mdb->status == DBI_ERROR_NONE) {
666       Dmsg1(500, "we have a result\n", query);
667
668       // how many fields in the set?
669       // num_fields starting at 1
670       mdb->num_fields = dbi_result_get_numfields(mdb->result);
671       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
672       // if no result num_rows is 0
673       mdb->num_rows = dbi_result_get_numrows(mdb->result);
674       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
675
676       mdb->status = (dbi_error_flag) 0;                  /* succeed */
677    } else {
678       Dmsg1(50, "Result status failed: %s\n", query);
679       goto bail_out;
680    }
681
682    Dmsg0(500, "my_dbi_query finishing\n");
683    return mdb->status;
684
685 bail_out:
686    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
687    //dbi_conn_error(mdb->db, &errmsg);
688    Dmsg4(500, "my_dbi_query we failed dbi error: "
689                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
690    dbi_result_free(mdb->result);
691    mdb->result = NULL;
692    mdb->status = (dbi_error_flag) 1;                   /* failed */
693    return mdb->status;
694 }
695
696 void my_dbi_free_result(B_DB *mdb)
697 {
698
699    DBI_FIELD_GET *f;
700    db_lock(mdb);
701    if (mdb->result) {
702       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
703       dbi_result_free(mdb->result);
704    }
705
706    mdb->result = NULL;
707
708    if (mdb->row) {
709       free(mdb->row);
710    }
711
712    /* now is time to free all value return by my_dbi_get_value
713     * this is necessary because libdbi don't free memory return by yours results
714     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
715     *
716     * Using a queue to store all pointer allocate is a good way to free all things
717     * when necessary
718     */
719    foreach_dlist(f, dbi_getvalue_list) {
720       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
721       free(f->value);
722       free(f);
723    }
724
725    mdb->row = NULL;
726
727    if (mdb->fields) {
728       free(mdb->fields);
729       mdb->fields = NULL;
730    }
731    db_unlock(mdb);
732    Dmsg0(500, "my_dbi_free_result finish\n");
733
734 }
735
736 const char *my_dbi_strerror(B_DB *mdb)
737 {
738    const char *errmsg;
739
740    dbi_conn_error(mdb->db, &errmsg);
741
742    return errmsg;
743 }
744
745 #ifdef HAVE_BATCH_FILE_INSERT
746
747 /*
748  * This can be a bit strang but is the one way to do
749  *
750  * Returns 1 if OK
751  *         0 if failed
752  */
753 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
754 {
755    const char *query = "COPY batch FROM STDIN";
756
757    Dmsg0(500, "my_dbi_batch_start started\n");
758
759    switch (mdb->db_type) {
760    case SQL_TYPE_MYSQL:
761       db_lock(mdb);
762       if (my_dbi_query(mdb,
763                               "CREATE TEMPORARY TABLE batch ("
764                                   "FileIndex integer,"
765                                   "JobId integer,"
766                                   "Path blob,"
767                                   "Name blob,"
768                                   "LStat tinyblob,"
769                                   "MD5 tinyblob)") == 1)
770       {
771          Dmsg0(500, "my_dbi_batch_start failed\n");
772          return 1;
773       }
774       db_unlock(mdb);
775       Dmsg0(500, "my_dbi_batch_start finishing\n");
776       return 1;
777       break;
778    case SQL_TYPE_POSTGRESQL:
779
780       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
781                                   "fileindex int,"
782                                   "jobid int,"
783                                   "path varchar,"
784                                   "name varchar,"
785                                   "lstat varchar,"
786                                   "md5 varchar)") == 1)
787       {
788          Dmsg0(500, "my_dbi_batch_start failed\n");
789          return 1;
790       }
791
792       // We are starting a new query.  reset everything.
793       mdb->num_rows     = -1;
794       mdb->row_number   = -1;
795       mdb->field_number = -1;
796
797       my_dbi_free_result(mdb);
798
799       for (int i=0; i < 10; i++) {
800          my_dbi_query(mdb, query);
801          if (mdb->result) {
802             break;
803          }
804          bmicrosleep(5, 0);
805       }
806       if (!mdb->result) {
807          Dmsg1(50, "Query failed: %s\n", query);
808          goto bail_out;
809       }
810
811       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
812       //mdb->status = DBI_ERROR_NONE;
813
814       if (mdb->status == DBI_ERROR_NONE) {
815          // how many fields in the set?
816          mdb->num_fields = dbi_result_get_numfields(mdb->result);
817          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
818          mdb->status = (dbi_error_flag) 1;
819       } else {
820          Dmsg1(50, "Result status failed: %s\n", query);
821          goto bail_out;
822       }
823
824       Dmsg0(500, "my_postgresql_batch_start finishing\n");
825
826       return mdb->status;
827       break;
828    case SQL_TYPE_SQLITE:
829       db_lock(mdb);
830       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
831                                   "FileIndex integer,"
832                                   "JobId integer,"
833                                   "Path blob,"
834                                   "Name blob,"
835                                   "LStat tinyblob,"
836                                   "MD5 tinyblob)") == 1)
837       {
838          Dmsg0(500, "my_dbi_batch_start failed\n");
839          goto bail_out;
840       }
841       db_unlock(mdb);
842       Dmsg0(500, "my_dbi_batch_start finishing\n");
843       return 1;
844       break;
845    case SQL_TYPE_SQLITE3:
846       db_lock(mdb);
847       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
848                                   "FileIndex integer,"
849                                   "JobId integer,"
850                                   "Path blob,"
851                                   "Name blob,"
852                                   "LStat tinyblob,"
853                                   "MD5 tinyblob)") == 1)
854       {
855          Dmsg0(500, "my_dbi_batch_start failed\n");
856          goto bail_out;
857       }
858       db_unlock(mdb);
859       Dmsg0(500, "my_dbi_batch_start finishing\n");
860       return 1;
861       break;
862    }
863
864 bail_out:
865    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
866    mdb->status = (dbi_error_flag) 0;
867    my_dbi_free_result(mdb);
868    mdb->result = NULL;
869    return mdb->status;
870 }
871
872 /* set error to something to abort operation */
873 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
874 {
875    int res = 0;
876    int count = 30;
877    int (*custom_function)(void*, const char*) = NULL;
878    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
879
880    Dmsg0(500, "my_dbi_batch_end started\n");
881
882    if (!mdb) {                  /* no files ? */
883       return 0;
884    }
885
886    switch (mdb->db_type) {
887    case SQL_TYPE_MYSQL:
888       if(mdb) {
889          mdb->status = (dbi_error_flag) 0;
890       }
891       break;
892    case SQL_TYPE_POSTGRESQL:
893       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
894
895
896       do {
897          res = (*custom_function)(myconn->connection, error);
898       } while (res == 0 && --count > 0);
899
900       if (res == 1) {
901          Dmsg0(500, "ok\n");
902          mdb->status = (dbi_error_flag) 1;
903       }
904
905       if (res <= 0) {
906          Dmsg0(500, "we failed\n");
907          mdb->status = (dbi_error_flag) 0;
908          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
909        }
910       break;
911    case SQL_TYPE_SQLITE:
912       if(mdb) {
913          mdb->status = (dbi_error_flag) 0;
914       }
915       break;
916    case SQL_TYPE_SQLITE3:
917       if(mdb) {
918          mdb->status = (dbi_error_flag) 0;
919       }
920       break;
921    }
922
923    Dmsg0(500, "my_dbi_batch_end finishing\n");
924
925    return true;
926 }
927
928 /*
929  * This function is big and use a big switch.
930  * In near future is better split in small functions
931  * and refactory.
932  *
933  */
934 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
935 {
936    int res;
937    int count=30;
938    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
939    int (*custom_function)(void*, const char*, int) = NULL;
940    char* (*custom_function_error)(void*) = NULL;
941    size_t len;
942    char *digest;
943    char ed1[50];
944
945    Dmsg0(500, "my_dbi_batch_insert started \n");
946
947    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
948    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
949
950    if (ar->Digest == NULL || ar->Digest[0] == 0) {
951       *digest = '\0';
952    } else {
953       digest = ar->Digest;
954    }
955
956    switch (mdb->db_type) {
957    case SQL_TYPE_MYSQL:
958       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
959       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
960       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
961                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
962                       mdb->esc_name, ar->attr, digest);
963
964       if (my_dbi_query(mdb,mdb->cmd) == 1)
965       {
966          Dmsg0(500, "my_dbi_batch_insert failed\n");
967          goto bail_out;
968       }
969
970       Dmsg0(500, "my_dbi_batch_insert finishing\n");
971
972       return 1;
973       break;
974    case SQL_TYPE_POSTGRESQL:
975       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
976       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
977       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
978                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
979                      mdb->esc_name, ar->attr, digest);
980
981       /* libdbi don't support CopyData and we need call a postgresql
982        * specific function to do this work
983        */
984       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
985       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
986             "PQputCopyData")) != NULL) {
987          do {
988             res = (*custom_function)(myconn->connection, mdb->cmd, len);
989          } while (res == 0 && --count > 0);
990
991          if (res == 1) {
992             Dmsg0(500, "ok\n");
993             mdb->changes++;
994             mdb->status = (dbi_error_flag) 1;
995          }
996
997          if (res <= 0) {
998             Dmsg0(500, "my_dbi_batch_insert failed\n");
999             goto bail_out;
1000          }
1001
1002          Dmsg0(500, "my_dbi_batch_insert finishing\n");
1003          return mdb->status;
1004       } else {
1005          // ensure to detect a PQerror
1006          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1007          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1008          goto bail_out;
1009       }
1010       break;
1011    case SQL_TYPE_SQLITE:
1012       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1013       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1014       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1015                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1016                       mdb->esc_name, ar->attr, digest);
1017       if (my_dbi_query(mdb,mdb->cmd) == 1)
1018       {
1019          Dmsg0(500, "my_dbi_batch_insert failed\n");
1020          goto bail_out;
1021       }
1022
1023       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1024
1025       return 1;
1026       break;
1027    case SQL_TYPE_SQLITE3:
1028       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1029       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1030       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1031                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1032                       mdb->esc_name, ar->attr, digest);
1033       if (my_dbi_query(mdb,mdb->cmd) == 1)
1034       {
1035          Dmsg0(500, "my_dbi_batch_insert failed\n");
1036          goto bail_out;
1037       }
1038
1039       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1040
1041       return 1;
1042       break;
1043    }
1044
1045 bail_out:
1046   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1047   mdb->status = (dbi_error_flag) 0;
1048   my_dbi_free_result(mdb);
1049   return mdb->status;
1050 }
1051
1052 /*
1053  * Escape strings so that PostgreSQL is happy on COPY
1054  *
1055  *   NOTE! len is the length of the old string. Your new
1056  *         string must be long enough (max 2*old+1) to hold
1057  *         the escaped output.
1058  */
1059 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1060 {
1061    /* we have to escape \t, \n, \r, \ */
1062    char c = '\0' ;
1063
1064    while (len > 0 && *src) {
1065       switch (*src) {
1066       case '\n':
1067          c = 'n';
1068          break;
1069       case '\\':
1070          c = '\\';
1071          break;
1072       case '\t':
1073          c = 't';
1074          break;
1075       case '\r':
1076          c = 'r';
1077          break;
1078       default:
1079          c = '\0' ;
1080       }
1081
1082       if (c) {
1083          *dest = '\\';
1084          dest++;
1085          *dest = c;
1086       } else {
1087          *dest = *src;
1088       }
1089
1090       len--;
1091       src++;
1092       dest++;
1093    }
1094
1095    *dest = '\0';
1096    return dest;
1097 }
1098
1099 #endif /* HAVE_BATCH_FILE_INSERT */
1100
1101 /* my_dbi_getisnull
1102  * like PQgetisnull
1103  * int PQgetisnull(const PGresult *res,
1104  *              int row_number,
1105  *               int column_number);
1106  *
1107  *  use dbi_result_seek_row to search in result set
1108  */
1109 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1110    int i;
1111
1112    if(row_number == 0) {
1113       row_number++;
1114    }
1115
1116    column_number++;
1117
1118    if(dbi_result_seek_row(result, row_number)) {
1119
1120       i = dbi_result_field_is_null_idx(result,column_number);
1121
1122       return i;
1123    } else {
1124
1125       return 0;
1126    }
1127
1128 }
1129 /* my_dbi_getvalue
1130  * like PQgetvalue;
1131  * char *PQgetvalue(const PGresult *res,
1132  *                int row_number,
1133  *                int column_number);
1134  *
1135  * use dbi_result_seek_row to search in result set
1136  * use example to return only strings
1137  */
1138 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1139
1140    char *buf = NULL;
1141    const char *errmsg;
1142    const char *field_name;
1143    unsigned short dbitype;
1144    size_t field_length;
1145    int64_t num;
1146
1147    /* correct the index for dbi interface
1148     * dbi index begins 1
1149     * I prefer do not change others functions
1150     */
1151    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1152                                 result, row_number, column_number);
1153
1154    column_number++;
1155
1156    if(row_number == 0) {
1157      row_number++;
1158    }
1159
1160    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1161                         result, row_number, column_number);
1162
1163    if(dbi_result_seek_row(result, row_number)) {
1164
1165       field_name = dbi_result_get_field_name(result, column_number);
1166       field_length = dbi_result_get_field_length(result, field_name);
1167       dbitype = dbi_result_get_field_type_idx(result,column_number);
1168
1169       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1170             "field_length bytes: '%d' fieldname: '%s'\n",
1171             dbitype, field_length, field_name);
1172
1173       if(field_length) {
1174          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1175          buf = (char *)malloc(field_length + 1);
1176       } else {
1177          /* if numbers */
1178          buf = (char *)malloc(sizeof(char *) * 50);
1179       }
1180
1181       switch (dbitype) {
1182       case DBI_TYPE_INTEGER:
1183          num = dbi_result_get_longlong(result, field_name);
1184          edit_int64(num, buf);
1185          field_length = strlen(buf);
1186          break;
1187       case DBI_TYPE_STRING:
1188          if(field_length) {
1189             field_length = bsnprintf(buf, field_length + 1, "%s",
1190             dbi_result_get_string(result, field_name));
1191          } else {
1192             buf[0] = 0;
1193          }
1194          break;
1195       case DBI_TYPE_BINARY:
1196          /* dbi_result_get_binary return a NULL pointer if value is empty
1197          * following, change this to what Bacula espected
1198          */
1199          if(field_length) {
1200             field_length = bsnprintf(buf, field_length + 1, "%s",
1201                   dbi_result_get_binary(result, field_name));
1202          } else {
1203             buf[0] = 0;
1204          }
1205          break;
1206       case DBI_TYPE_DATETIME:
1207          time_t last;
1208          struct tm tm;
1209
1210          last = dbi_result_get_datetime(result, field_name);
1211
1212          if(last == -1) {
1213                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1214          } else {
1215             (void)localtime_r(&last, &tm);
1216             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1217                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1218                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1219          }
1220          break;
1221       }
1222
1223    } else {
1224       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1225       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1226    }
1227
1228    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1229       buf, field_length, buf);
1230
1231    // don't worry about this buf
1232    return buf;
1233 }
1234
1235 static int my_dbi_sequence_last(B_DB *mdb, const char *table_name)
1236 {
1237    /*
1238     Obtain the current value of the sequence that
1239     provides the serial value for primary key of the table.
1240
1241     currval is local to our session.  It is not affected by
1242     other transactions.
1243
1244     Determine the name of the sequence.
1245     PostgreSQL automatically creates a sequence using
1246     <table>_<column>_seq.
1247     At the time of writing, all tables used this format for
1248     for their primary key: <table>id
1249     Except for basefiles which has a primary key on baseid.
1250     Therefore, we need to special case that one table.
1251
1252     everything else can use the PostgreSQL formula.
1253    */
1254
1255    char sequence[30];
1256    uint64_t id = 0;
1257
1258    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1259
1260       if (strcasecmp(table_name, "basefiles") == 0) {
1261          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1262       } else {
1263          bstrncpy(sequence, table_name, sizeof(sequence));
1264          bstrncat(sequence, "_",        sizeof(sequence));
1265          bstrncat(sequence, table_name, sizeof(sequence));
1266          bstrncat(sequence, "id",       sizeof(sequence));
1267       }
1268
1269       bstrncat(sequence, "_seq", sizeof(sequence));
1270       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1271    } else {
1272       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1273    }
1274
1275    return id;
1276 }
1277
1278 int my_dbi_sql_insert_id(B_DB *mdb, const char *query, const char *table_name)
1279 {
1280    /*
1281     * First execute the insert query and then retrieve the currval.
1282     */
1283    if (my_dbi_query(mdb, query)) {
1284       return 0;
1285    }
1286
1287    mdb->num_rows = sql_affected_rows(mdb);
1288    if (mdb->num_rows != 1) {
1289       return 0;
1290    }
1291
1292    mdb->changes++;
1293
1294    return my_dbi_sequence_last(mdb, table_name);
1295 }
1296
1297 #ifdef HAVE_BATCH_FILE_INSERT
1298 const char *my_dbi_batch_lock_path_query[5] = {
1299    /* Mysql */
1300    "LOCK TABLES Path write, batch write, Path as p write",
1301    /* Postgresql */
1302    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1303    /* SQLite */
1304    "BEGIN",
1305    /* SQLite3 */
1306    "BEGIN",
1307    /* Ingres */
1308    "BEGIN"
1309 };
1310
1311 const char *my_dbi_batch_lock_filename_query[5] = {
1312    /* Mysql */
1313    "LOCK TABLES Filename write, batch write, Filename as f write",
1314    /* Postgresql */
1315    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1316    /* SQLite */
1317    "BEGIN",
1318    /* SQLite3 */
1319    "BEGIN",
1320    /* Ingres */
1321    "BEGIN"
1322 };
1323
1324 const char *my_dbi_batch_unlock_tables_query[5] = {
1325    /* Mysql */
1326    "UNLOCK TABLES",
1327    /* Postgresql */
1328    "COMMIT",
1329    /* SQLite */
1330    "COMMIT",
1331    /* SQLite3 */
1332    "COMMIT",
1333    /* Ingres */
1334    "COMMIT"
1335 };
1336
1337 const char *my_dbi_batch_fill_path_query[5] = {
1338    /* Mysql */
1339    "INSERT INTO Path (Path) "
1340    "SELECT a.Path FROM "
1341    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1342    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1343    /* Postgresql */
1344    "INSERT INTO Path (Path) "
1345    "SELECT a.Path FROM "
1346    "(SELECT DISTINCT Path FROM batch) AS a "
1347    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1348    /* SQLite */
1349    "INSERT INTO Path (Path)"
1350    " SELECT DISTINCT Path FROM batch"
1351    " EXCEPT SELECT Path FROM Path",
1352    /* SQLite3 */
1353    "INSERT INTO Path (Path)"
1354    " SELECT DISTINCT Path FROM batch"
1355    " EXCEPT SELECT Path FROM Path",
1356    /* Ingres */
1357    "INSERT INTO Path (Path) "
1358    "SELECT a.Path FROM "
1359    "(SELECT DISTINCT Path FROM batch) AS a "
1360    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1361 };
1362
1363 const char *my_dbi_batch_fill_filename_query[5] = {
1364    /* Mysql */
1365    "INSERT INTO Filename (Name) "
1366    "SELECT a.Name FROM "
1367    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1368    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1369    /* Postgresql */
1370    "INSERT INTO Filename (Name) "
1371    "SELECT a.Name FROM "
1372    "(SELECT DISTINCT Name FROM batch) as a "
1373    "WHERE NOT EXISTS "
1374    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1375    /* SQLite */
1376    "INSERT INTO Filename (Name)"
1377    " SELECT DISTINCT Name FROM batch "
1378    " EXCEPT SELECT Name FROM Filename",
1379    /* SQLite3 */
1380    "INSERT INTO Filename (Name)"
1381    " SELECT DISTINCT Name FROM batch "
1382    " EXCEPT SELECT Name FROM Filename",
1383    /* Ingres */
1384    "INSERT INTO Filename (Name) "
1385    "SELECT a.Name FROM "
1386    "(SELECT DISTINCT Name FROM batch) as a "
1387    "WHERE NOT EXISTS "
1388    "(SELECT Name FROM Filename WHERE Name = a.Name)"
1389 };
1390
1391 #endif /* HAVE_BATCH_FILE_INSERT */
1392
1393 const char *my_dbi_match[5] = {
1394    /* Mysql */
1395    "MATCH",
1396    /* Postgresql */
1397    "~",
1398    /* SQLite */
1399    "MATCH",
1400    /* SQLite3 */
1401    "MATCH",
1402    /* Ingres */
1403    "~"
1404 };
1405
1406 #endif /* HAVE_DBI */