]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
d178bc3ebeb55aee4caab11dbf638916e0d939a9
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  */
37 /*
38  * This code only compiles against a recent version of libdbi. The current
39  * release found on the libdbi website (0.8.3) won't work for this code.
40  *
41  * You find the libdbi library on http://sourceforge.net/projects/libdbi
42  *
43  * A fairly recent version of libdbi from CVS works, so either make sure
44  * your distribution has a fairly recent version of libdbi installed or
45  * clone the CVS repositories from sourceforge and compile that code and
46  * install it.
47  *
48  * You need:
49  * cvs co :pserver:anonymous@libdbi.cvs.sourceforge.net:/cvsroot/libdbi
50  * cvs co :pserver:anonymous@libdbi-drivers.cvs.sourceforge.net:/cvsroot/libdbi-drivers
51  */
52
53
54 /* The following is necessary so that we do not include
55  * the dummy external definition of DB.
56  */
57 #define __SQL_C                       /* indicate that this is sql.c */
58
59 #include "bacula.h"
60 #include "cats.h"
61
62 #ifdef HAVE_DBI
63
64 /* -----------------------------------------------------------------------
65  *
66  *   DBI dependent defines and subroutines
67  *
68  * -----------------------------------------------------------------------
69  */
70
71 /* List of open databases */
72 static dlist *db_list = NULL;
73
74 /* Control allocated fields by my_dbi_getvalue */
75 static dlist *dbi_getvalue_list = NULL;
76
77 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
78
79 /*
80  * Retrieve database type
81  */
82 const char *
83 db_get_type(void)
84 {
85    return "DBI";
86 }
87
88 /*
89  * Initialize database data structure. In principal this should
90  * never have errors, or it is really fatal.
91  */
92 B_DB *
93 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
94                  const char *db_address, int db_port, const char *db_socket,
95                  int mult_db_connections)
96 {
97    B_DB *mdb = NULL;
98    DBI_FIELD_GET *field;
99    char db_driver[10];
100    char db_driverdir[256];
101
102    /* Constraint the db_driver */
103    if(db_type  == -1) {
104       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
105       return NULL;
106    }
107
108    /* Do the correct selection of driver.
109     * Can be one of the varius supported by libdbi
110     */
111    switch (db_type) {
112    case SQL_TYPE_MYSQL:
113       bstrncpy(db_driver,"mysql", sizeof(db_driver));
114       break;
115    case SQL_TYPE_POSTGRESQL:
116       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
117       break;
118    case SQL_TYPE_SQLITE:
119       bstrncpy(db_driver,"sqlite", sizeof(db_driver));
120       break;
121    case SQL_TYPE_SQLITE3:
122       bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
123       break;
124    }
125
126    /* Set db_driverdir whereis is the libdbi drivers */
127    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
128
129    if (!db_user) {
130       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
131       return NULL;
132    }
133    P(mutex);                          /* lock DB queue */
134    if (db_list == NULL) {
135       db_list = New(dlist(mdb, &mdb->link));
136       dbi_getvalue_list = New(dlist(field, &field->link));
137    }
138    if (!mult_db_connections) {
139       /* Look to see if DB already open */
140       foreach_dlist(mdb, db_list) {
141          if (bstrcmp(mdb->db_name, db_name) &&
142              bstrcmp(mdb->db_address, db_address) &&
143              bstrcmp(mdb->db_driver, db_driver) &&
144              mdb->db_port == db_port) {
145             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
146                   dbi_conn_error(mdb->db, NULL));
147             mdb->ref_count++;
148             V(mutex);
149             return mdb;                  /* already open */
150          }
151       }
152    }
153    Dmsg0(100, "db_open first time\n");
154    mdb = (B_DB *)malloc(sizeof(B_DB));
155    memset(mdb, 0, sizeof(B_DB));
156    mdb->db_name = bstrdup(db_name);
157    mdb->db_user = bstrdup(db_user);
158    if (db_password) {
159       mdb->db_password = bstrdup(db_password);
160    }
161    if (db_address) {
162       mdb->db_address  = bstrdup(db_address);
163    }
164    if (db_socket) {
165       mdb->db_socket   = bstrdup(db_socket);
166    }
167    if (db_driverdir) {
168       mdb->db_driverdir = bstrdup(db_driverdir);
169    }
170    if (db_driver) {
171       mdb->db_driver    = bstrdup(db_driver);
172    }
173    mdb->db_type        = db_type;
174    mdb->db_port        = db_port;
175    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
176    *mdb->errmsg        = 0;
177    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
178    mdb->cached_path    = get_pool_memory(PM_FNAME);
179    mdb->cached_path_id = 0;
180    mdb->ref_count      = 1;
181    mdb->fname          = get_pool_memory(PM_FNAME);
182    mdb->path           = get_pool_memory(PM_FNAME);
183    mdb->esc_name       = get_pool_memory(PM_FNAME);
184    mdb->esc_path      = get_pool_memory(PM_FNAME);
185    mdb->allow_transactions = mult_db_connections;
186    db_list->append(mdb);                   /* put db in list */
187    V(mutex);
188    return mdb;
189 }
190
191 /*
192  * Now actually open the database.  This can generate errors,
193  *   which are returned in the errmsg
194  *
195  * DO NOT close the database or free(mdb) here  !!!!
196  */
197 int
198 db_open_database(JCR *jcr, B_DB *mdb)
199 {
200    int errstat;
201    int dbstat;
202    uint8_t len;
203    const char *errmsg;
204    char buf[10], *port;
205    int numdrivers;
206    char *db_name = NULL;
207    char *db_dir = NULL;
208
209    P(mutex);
210    if (mdb->connected) {
211       V(mutex);
212       return 1;
213    }
214    mdb->connected = false;
215
216    if ((errstat=rwl_init(&mdb->lock)) != 0) {
217       berrno be;
218       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
219             be.bstrerror(errstat));
220       V(mutex);
221       return 0;
222    }
223
224    if (mdb->db_port) {
225       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
226       port = buf;
227    } else {
228       port = NULL;
229    }
230
231    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
232    if (numdrivers < 0) {
233       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
234                                "db_driverdir=%s. It is probaly not found any drivers\n"),
235                                mdb->db_driverdir,numdrivers);
236       V(mutex);
237       return 0;
238    }
239    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
240    /* Can be many types of databases */
241    switch (mdb->db_type) {
242    case SQL_TYPE_MYSQL:
243       dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
244       dbi_conn_set_option(mdb->db, "port", port);            /* default port */
245       dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
246       dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
247       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
248       break;
249    case SQL_TYPE_POSTGRESQL:
250       dbi_conn_set_option(mdb->db, "host", mdb->db_address);
251       dbi_conn_set_option(mdb->db, "port", port);
252       dbi_conn_set_option(mdb->db, "username", mdb->db_user);
253       dbi_conn_set_option(mdb->db, "password", mdb->db_password);
254       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
255       break;
256    case SQL_TYPE_SQLITE:
257       len = strlen(working_directory) + 5;
258       db_dir = (char *)malloc(len);
259       strcpy(db_dir, working_directory);
260       strcat(db_dir, "/");
261       len = strlen(mdb->db_name) + 5;
262       db_name = (char *)malloc(len);
263       strcpy(db_name, mdb->db_name);
264       strcat(db_name, ".db");
265       dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
266       dbi_conn_set_option(mdb->db, "dbname", db_name);
267       break;
268    case SQL_TYPE_SQLITE3:
269       len = strlen(working_directory) + 5;
270       db_dir = (char *)malloc(len);
271       strcpy(db_dir, working_directory);
272       strcat(db_dir, "/");
273       len = strlen(mdb->db_name) + 5;
274       db_name = (char *)malloc(len);
275       strcpy(db_name, mdb->db_name);
276       strcat(db_name, ".db");
277       dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
278       dbi_conn_set_option(mdb->db, "dbname", db_name);
279       Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
280       break;
281    }
282
283    /* If connection fails, try at 5 sec intervals for 30 seconds. */
284    for (int retry=0; retry < 6; retry++) {
285
286       dbstat = dbi_conn_connect(mdb->db);
287       if ( dbstat == 0) {
288          break;
289       }
290
291       dbi_conn_error(mdb->db, &errmsg);
292       Dmsg1(50, "dbi error: %s\n", errmsg);
293
294       bmicrosleep(5, 0);
295
296    }
297
298    if ( dbstat != 0 ) {
299       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
300          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
301          mdb->db_driver, mdb->db_name, mdb->db_user);
302       V(mutex);
303       return 0;
304    }
305
306    Dmsg0(50, "dbi_real_connect done\n");
307    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
308                     mdb->db_user, mdb->db_name,
309                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
310
311    mdb->connected = true;
312
313    if (!check_tables_version(jcr, mdb)) {
314       V(mutex);
315       return 0;
316    }
317
318    switch (mdb->db_type) {
319    case SQL_TYPE_MYSQL:
320       /* Set connection timeout to 8 days specialy for batch mode */
321       sql_query(mdb, "SET wait_timeout=691200");
322       sql_query(mdb, "SET interactive_timeout=691200");
323       break;
324    case SQL_TYPE_POSTGRESQL:
325       /* tell PostgreSQL we are using standard conforming strings
326          and avoid warnings such as:
327          WARNING:  nonstandard use of \\ in a string literal
328       */
329       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
330       sql_query(mdb, "set standard_conforming_strings=on");
331       break;
332    }
333
334    if(db_dir) {
335       free(db_dir);
336    }
337    if(db_name) {
338       free(db_name);
339    }
340
341    V(mutex);
342    return 1;
343 }
344
345 void
346 db_close_database(JCR *jcr, B_DB *mdb)
347 {
348    if (!mdb) {
349       return;
350    }
351    db_end_transaction(jcr, mdb);
352    P(mutex);
353    sql_free_result(mdb);
354    mdb->ref_count--;
355    if (mdb->ref_count == 0) {
356       db_list->remove(mdb);
357       if (mdb->connected && mdb->db) {
358          //sql_close(mdb);
359          dbi_shutdown_r(mdb->instance);
360          mdb->db = NULL;
361          mdb->instance = NULL;
362       }
363       rwl_destroy(&mdb->lock);
364       free_pool_memory(mdb->errmsg);
365       free_pool_memory(mdb->cmd);
366       free_pool_memory(mdb->cached_path);
367       free_pool_memory(mdb->fname);
368       free_pool_memory(mdb->path);
369       free_pool_memory(mdb->esc_name);
370       free_pool_memory(mdb->esc_path);
371       if (mdb->db_name) {
372          free(mdb->db_name);
373       }
374       if (mdb->db_user) {
375          free(mdb->db_user);
376       }
377       if (mdb->db_password) {
378          free(mdb->db_password);
379       }
380       if (mdb->db_address) {
381          free(mdb->db_address);
382       }
383       if (mdb->db_socket) {
384          free(mdb->db_socket);
385       }
386       if (mdb->db_driverdir) {
387          free(mdb->db_driverdir);
388       }
389       if (mdb->db_driver) {
390           free(mdb->db_driver);
391       }
392       free(mdb);
393       if (db_list->size() == 0) {
394          delete db_list;
395          db_list = NULL;
396       }
397    }
398    V(mutex);
399 }
400
401 void db_thread_cleanup()
402 { }
403
404 /*
405  * Return the next unique index (auto-increment) for
406  * the given table.  Return NULL on error.
407  *
408  */
409 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
410 {
411    strcpy(index, "NULL");
412    return 1;
413 }
414
415
416 /*
417  * Escape strings so that DBI is happy
418  *
419  *   NOTE! len is the length of the old string. Your new
420  *         string must be long enough (max 2*old+1) to hold
421  *         the escaped output.
422  *
423  * dbi_conn_quote_string_copy receives a pointer to pointer.
424  * We need copy the value of pointer to snew because libdbi change the
425  * pointer
426  */
427 void
428 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
429 {
430    char *inew;
431    char *pnew;
432
433    if (len == 0) {
434       snew[0] = 0;
435    } else {
436       /* correct the size of old basead in len
437        * and copy new string to inew
438        */
439       inew = (char *)malloc(sizeof(char) * len + 1);
440       bstrncpy(inew,old,len + 1);
441       /* escape the correct size of old */
442       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
443       free(inew);
444       /* copy the escaped string to snew */
445       bstrncpy(snew, pnew, 2 * len + 1);
446    }
447
448    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
449
450 }
451
452 /*
453  * Submit a general SQL command (cmd), and for each row returned,
454  *  the sqlite_handler is called with the ctx.
455  */
456 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
457 {
458    SQL_ROW row;
459
460    Dmsg0(500, "db_sql_query started\n");
461
462    db_lock(mdb);
463    if (sql_query(mdb, query) != 0) {
464       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
465       db_unlock(mdb);
466       Dmsg0(500, "db_sql_query failed\n");
467       return false;
468    }
469    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
470
471    if (result_handler != NULL) {
472       Dmsg0(500, "db_sql_query invoking handler\n");
473       if ((mdb->result = sql_store_result(mdb)) != NULL) {
474          int num_fields = sql_num_fields(mdb);
475
476          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
477          while ((row = sql_fetch_row(mdb)) != NULL) {
478
479             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
480             if (result_handler(ctx, num_fields, row))
481                break;
482          }
483
484         sql_free_result(mdb);
485       }
486    }
487    db_unlock(mdb);
488
489    Dmsg0(500, "db_sql_query finished\n");
490
491    return true;
492 }
493
494
495
496 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
497 {
498    int j;
499    DBI_ROW row = NULL; // by default, return NULL
500
501    Dmsg0(500, "my_dbi_fetch_row start\n");
502    if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
503       int num_fields = mdb->num_fields;
504       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
505
506       if (mdb->row) {
507          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
508          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
509          if (mdb->num_rows != 0) {
510             for(j = 0; j < mdb->num_fields; j++) {
511                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
512                   if(mdb->row[j]) {
513                      free(mdb->row[j]);
514                   }
515             }
516          }
517          free(mdb->row);
518       }
519       //num_fields += 20;                  /* add a bit extra */
520       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
521       mdb->row_size = num_fields;
522
523       // now reset the row_number now that we have the space allocated
524       mdb->row_number = 1;
525    }
526
527    // if still within the result set
528    if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
529       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
530       // get each value from this row
531       for (j = 0; j < mdb->num_fields; j++) {
532          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
533          // allocate space to queue row
534          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
535          // store the pointer in queue
536          mdb->field_get->value = mdb->row[j];
537          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
538                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
539          // insert in queue to future free
540          dbi_getvalue_list->append(mdb->field_get);
541       }
542       // increment the row number for the next call
543       mdb->row_number++;
544
545       row = mdb->row;
546    } else {
547       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
548    }
549
550    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
551
552    return row;
553 }
554
555 int my_dbi_max_length(B_DB *mdb, int field_num) {
556    //
557    // for a given column, find the max length
558    //
559    int max_length;
560    int i;
561    int this_length;
562    char *cbuf = NULL;
563
564    max_length = 0;
565    for (i = 0; i < mdb->num_rows; i++) {
566       if (my_dbi_getisnull(mdb->result, i, field_num)) {
567           this_length = 4;        // "NULL"
568       } else {
569          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
570          this_length = cstrlen(cbuf);
571          // cbuf is always free
572          free(cbuf);
573       }
574
575       if (max_length < this_length) {
576           max_length = this_length;
577       }
578    }
579
580    return max_length;
581 }
582
583 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
584 {
585    int     i;
586    int     dbi_index;
587
588    Dmsg0(500, "my_dbi_fetch_field starts\n");
589
590    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
591       if (mdb->fields) {
592          free(mdb->fields);
593       }
594       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
595       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
596       mdb->fields_size = mdb->num_fields;
597
598       for (i = 0; i < mdb->num_fields; i++) {
599          // num_fileds is starting at 1, increment i by 1
600          dbi_index = i + 1;
601          Dmsg1(500, "filling field %d\n", i);
602          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
603          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
604          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
605          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
606
607          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
608             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
609             mdb->fields[i].flags);
610       } // end for
611    } // end if
612
613    // increment field number for the next time around
614
615    Dmsg0(500, "my_dbi_fetch_field finishes\n");
616    return &mdb->fields[mdb->field_number++];
617 }
618
619 void my_dbi_data_seek(B_DB *mdb, int row)
620 {
621    // set the row number to be returned on the next call
622    // to my_dbi_fetch_row
623    mdb->row_number = row;
624 }
625
626 void my_dbi_field_seek(B_DB *mdb, int field)
627 {
628    mdb->field_number = field;
629 }
630
631 /*
632  * Note, if this routine returns 1 (failure), Bacula expects
633  *  that no result has been stored.
634  *
635  *  Returns:  0  on success
636  *            1  on failure
637  *
638  */
639 int my_dbi_query(B_DB *mdb, const char *query)
640 {
641    const char *errmsg;
642    Dmsg1(500, "my_dbi_query started %s\n", query);
643    // We are starting a new query.  reset everything.
644    mdb->num_rows     = -1;
645    mdb->row_number   = -1;
646    mdb->field_number = -1;
647
648    if (mdb->result) {
649       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
650       mdb->result = NULL;
651    }
652
653    mdb->result = (void **)dbi_conn_query(mdb->db, query);
654
655    if (!mdb->result) {
656       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
657       goto bail_out;
658    }
659
660    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
661
662    if (mdb->status == DBI_ERROR_NONE) {
663       Dmsg1(500, "we have a result\n", query);
664
665       // how many fields in the set?
666       // num_fields starting at 1
667       mdb->num_fields = dbi_result_get_numfields(mdb->result);
668       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
669       // if no result num_rows is 0
670       mdb->num_rows = dbi_result_get_numrows(mdb->result);
671       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
672
673       mdb->status = (dbi_error_flag) 0;                  /* succeed */
674    } else {
675       Dmsg1(50, "Result status failed: %s\n", query);
676       goto bail_out;
677    }
678
679    Dmsg0(500, "my_dbi_query finishing\n");
680    return mdb->status;
681
682 bail_out:
683    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
684    //dbi_conn_error(mdb->db, &errmsg);
685    Dmsg4(500, "my_dbi_query we failed dbi error: "
686                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
687    dbi_result_free(mdb->result);
688    mdb->result = NULL;
689    mdb->status = (dbi_error_flag) 1;                   /* failed */
690    return mdb->status;
691 }
692
693 void my_dbi_free_result(B_DB *mdb)
694 {
695
696    DBI_FIELD_GET *f;
697    db_lock(mdb);
698    if (mdb->result) {
699       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
700       dbi_result_free(mdb->result);
701    }
702
703    mdb->result = NULL;
704
705    if (mdb->row) {
706       free(mdb->row);
707    }
708
709    /* now is time to free all value return by my_dbi_get_value
710     * this is necessary because libdbi don't free memory return by yours results
711     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
712     *
713     * Using a queue to store all pointer allocate is a good way to free all things
714     * when necessary
715     */
716    foreach_dlist(f, dbi_getvalue_list) {
717       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
718       free(f->value);
719       free(f);
720    }
721
722    mdb->row = NULL;
723
724    if (mdb->fields) {
725       free(mdb->fields);
726       mdb->fields = NULL;
727    }
728    db_unlock(mdb);
729    Dmsg0(500, "my_dbi_free_result finish\n");
730
731 }
732
733 const char *my_dbi_strerror(B_DB *mdb)
734 {
735    const char *errmsg;
736
737    dbi_conn_error(mdb->db, &errmsg);
738
739    return errmsg;
740 }
741
742 #ifdef HAVE_BATCH_FILE_INSERT
743
744 /*
745  * This can be a bit strang but is the one way to do
746  *
747  * Returns 1 if OK
748  *         0 if failed
749  */
750 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
751 {
752    const char *query = "COPY batch FROM STDIN";
753
754    Dmsg0(500, "my_dbi_batch_start started\n");
755
756    switch (mdb->db_type) {
757    case SQL_TYPE_MYSQL:
758       db_lock(mdb);
759       if (my_dbi_query(mdb,
760                               "CREATE TEMPORARY TABLE batch ("
761                                   "FileIndex integer,"
762                                   "JobId integer,"
763                                   "Path blob,"
764                                   "Name blob,"
765                                   "LStat tinyblob,"
766                                   "MD5 tinyblob)") == 1)
767       {
768          Dmsg0(500, "my_dbi_batch_start failed\n");
769          return 1;
770       }
771       db_unlock(mdb);
772       Dmsg0(500, "my_dbi_batch_start finishing\n");
773       return 1;
774       break;
775    case SQL_TYPE_POSTGRESQL:
776
777       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
778                                   "fileindex int,"
779                                   "jobid int,"
780                                   "path varchar,"
781                                   "name varchar,"
782                                   "lstat varchar,"
783                                   "md5 varchar)") == 1)
784       {
785          Dmsg0(500, "my_dbi_batch_start failed\n");
786          return 1;
787       }
788
789       // We are starting a new query.  reset everything.
790       mdb->num_rows     = -1;
791       mdb->row_number   = -1;
792       mdb->field_number = -1;
793
794       my_dbi_free_result(mdb);
795
796       for (int i=0; i < 10; i++) {
797          my_dbi_query(mdb, query);
798          if (mdb->result) {
799             break;
800          }
801          bmicrosleep(5, 0);
802       }
803       if (!mdb->result) {
804          Dmsg1(50, "Query failed: %s\n", query);
805          goto bail_out;
806       }
807
808       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
809       //mdb->status = DBI_ERROR_NONE;
810
811       if (mdb->status == DBI_ERROR_NONE) {
812          // how many fields in the set?
813          mdb->num_fields = dbi_result_get_numfields(mdb->result);
814          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
815          mdb->status = (dbi_error_flag) 1;
816       } else {
817          Dmsg1(50, "Result status failed: %s\n", query);
818          goto bail_out;
819       }
820
821       Dmsg0(500, "my_postgresql_batch_start finishing\n");
822
823       return mdb->status;
824       break;
825    case SQL_TYPE_SQLITE:
826       db_lock(mdb);
827       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
828                                   "FileIndex integer,"
829                                   "JobId integer,"
830                                   "Path blob,"
831                                   "Name blob,"
832                                   "LStat tinyblob,"
833                                   "MD5 tinyblob)") == 1)
834       {
835          Dmsg0(500, "my_dbi_batch_start failed\n");
836          goto bail_out;
837       }
838       db_unlock(mdb);
839       Dmsg0(500, "my_dbi_batch_start finishing\n");
840       return 1;
841       break;
842    case SQL_TYPE_SQLITE3:
843       db_lock(mdb);
844       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
845                                   "FileIndex integer,"
846                                   "JobId integer,"
847                                   "Path blob,"
848                                   "Name blob,"
849                                   "LStat tinyblob,"
850                                   "MD5 tinyblob)") == 1)
851       {
852          Dmsg0(500, "my_dbi_batch_start failed\n");
853          goto bail_out;
854       }
855       db_unlock(mdb);
856       Dmsg0(500, "my_dbi_batch_start finishing\n");
857       return 1;
858       break;
859    }
860
861 bail_out:
862    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
863    mdb->status = (dbi_error_flag) 0;
864    my_dbi_free_result(mdb);
865    mdb->result = NULL;
866    return mdb->status;
867 }
868
869 /* set error to something to abort operation */
870 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
871 {
872    int res = 0;
873    int count = 30;
874    int (*custom_function)(void*, const char*) = NULL;
875    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
876
877    Dmsg0(500, "my_dbi_batch_end started\n");
878
879    if (!mdb) {                  /* no files ? */
880       return 0;
881    }
882
883    switch (mdb->db_type) {
884    case SQL_TYPE_MYSQL:
885       if(mdb) {
886          mdb->status = (dbi_error_flag) 0;
887       }
888       break;
889    case SQL_TYPE_POSTGRESQL:
890       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
891
892
893       do {
894          res = (*custom_function)(myconn->connection, error);
895       } while (res == 0 && --count > 0);
896
897       if (res == 1) {
898          Dmsg0(500, "ok\n");
899          mdb->status = (dbi_error_flag) 1;
900       }
901
902       if (res <= 0) {
903          Dmsg0(500, "we failed\n");
904          mdb->status = (dbi_error_flag) 0;
905          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
906        }
907       break;
908    case SQL_TYPE_SQLITE:
909       if(mdb) {
910          mdb->status = (dbi_error_flag) 0;
911       }
912       break;
913    case SQL_TYPE_SQLITE3:
914       if(mdb) {
915          mdb->status = (dbi_error_flag) 0;
916       }
917       break;
918    }
919
920    Dmsg0(500, "my_dbi_batch_end finishing\n");
921
922    return true;
923 }
924
925 /*
926  * This function is big and use a big switch.
927  * In near future is better split in small functions
928  * and refactory.
929  *
930  */
931 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
932 {
933    int res;
934    int count=30;
935    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
936    int (*custom_function)(void*, const char*, int) = NULL;
937    char* (*custom_function_error)(void*) = NULL;
938    size_t len;
939    char *digest;
940    char ed1[50];
941
942    Dmsg0(500, "my_dbi_batch_insert started \n");
943
944    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
945    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
946
947    if (ar->Digest == NULL || ar->Digest[0] == 0) {
948       *digest = '\0';
949    } else {
950       digest = ar->Digest;
951    }
952
953    switch (mdb->db_type) {
954    case SQL_TYPE_MYSQL:
955       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
956       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
957       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
958                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
959                       mdb->esc_name, ar->attr, digest);
960
961       if (my_dbi_query(mdb,mdb->cmd) == 1)
962       {
963          Dmsg0(500, "my_dbi_batch_insert failed\n");
964          goto bail_out;
965       }
966
967       Dmsg0(500, "my_dbi_batch_insert finishing\n");
968
969       return 1;
970       break;
971    case SQL_TYPE_POSTGRESQL:
972       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
973       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
974       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
975                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
976                      mdb->esc_name, ar->attr, digest);
977
978       /* libdbi don't support CopyData and we need call a postgresql
979        * specific function to do this work
980        */
981       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
982       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
983             "PQputCopyData")) != NULL) {
984          do {
985             res = (*custom_function)(myconn->connection, mdb->cmd, len);
986          } while (res == 0 && --count > 0);
987
988          if (res == 1) {
989             Dmsg0(500, "ok\n");
990             mdb->changes++;
991             mdb->status = (dbi_error_flag) 1;
992          }
993
994          if (res <= 0) {
995             Dmsg0(500, "my_dbi_batch_insert failed\n");
996             goto bail_out;
997          }
998
999          Dmsg0(500, "my_dbi_batch_insert finishing\n");
1000          return mdb->status;
1001       } else {
1002          // ensure to detect a PQerror
1003          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
1004          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
1005          goto bail_out;
1006       }
1007       break;
1008    case SQL_TYPE_SQLITE:
1009       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1010       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1011       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1012                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1013                       mdb->esc_name, ar->attr, digest);
1014       if (my_dbi_query(mdb,mdb->cmd) == 1)
1015       {
1016          Dmsg0(500, "my_dbi_batch_insert failed\n");
1017          goto bail_out;
1018       }
1019
1020       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1021
1022       return 1;
1023       break;
1024    case SQL_TYPE_SQLITE3:
1025       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1026       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1027       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1028                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1029                       mdb->esc_name, ar->attr, digest);
1030       if (my_dbi_query(mdb,mdb->cmd) == 1)
1031       {
1032          Dmsg0(500, "my_dbi_batch_insert failed\n");
1033          goto bail_out;
1034       }
1035
1036       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1037
1038       return 1;
1039       break;
1040    }
1041
1042 bail_out:
1043   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1044   mdb->status = (dbi_error_flag) 0;
1045   my_dbi_free_result(mdb);
1046   return mdb->status;
1047 }
1048
1049 /*
1050  * Escape strings so that PostgreSQL is happy on COPY
1051  *
1052  *   NOTE! len is the length of the old string. Your new
1053  *         string must be long enough (max 2*old+1) to hold
1054  *         the escaped output.
1055  */
1056 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1057 {
1058    /* we have to escape \t, \n, \r, \ */
1059    char c = '\0' ;
1060
1061    while (len > 0 && *src) {
1062       switch (*src) {
1063       case '\n':
1064          c = 'n';
1065          break;
1066       case '\\':
1067          c = '\\';
1068          break;
1069       case '\t':
1070          c = 't';
1071          break;
1072       case '\r':
1073          c = 'r';
1074          break;
1075       default:
1076          c = '\0' ;
1077       }
1078
1079       if (c) {
1080          *dest = '\\';
1081          dest++;
1082          *dest = c;
1083       } else {
1084          *dest = *src;
1085       }
1086
1087       len--;
1088       src++;
1089       dest++;
1090    }
1091
1092    *dest = '\0';
1093    return dest;
1094 }
1095
1096 #endif /* HAVE_BATCH_FILE_INSERT */
1097
1098 /* my_dbi_getisnull
1099  * like PQgetisnull
1100  * int PQgetisnull(const PGresult *res,
1101  *              int row_number,
1102  *               int column_number);
1103  *
1104  *  use dbi_result_seek_row to search in result set
1105  */
1106 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1107    int i;
1108
1109    if(row_number == 0) {
1110       row_number++;
1111    }
1112
1113    column_number++;
1114
1115    if(dbi_result_seek_row(result, row_number)) {
1116
1117       i = dbi_result_field_is_null_idx(result,column_number);
1118
1119       return i;
1120    } else {
1121
1122       return 0;
1123    }
1124
1125 }
1126 /* my_dbi_getvalue
1127  * like PQgetvalue;
1128  * char *PQgetvalue(const PGresult *res,
1129  *                int row_number,
1130  *                int column_number);
1131  *
1132  * use dbi_result_seek_row to search in result set
1133  * use example to return only strings
1134  */
1135 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1136
1137    char *buf = NULL;
1138    const char *errmsg;
1139    const char *field_name;
1140    unsigned short dbitype;
1141    size_t field_length;
1142    int64_t num;
1143
1144    /* correct the index for dbi interface
1145     * dbi index begins 1
1146     * I prefer do not change others functions
1147     */
1148    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1149                                 result, row_number, column_number);
1150
1151    column_number++;
1152
1153    if(row_number == 0) {
1154      row_number++;
1155    }
1156
1157    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1158                         result, row_number, column_number);
1159
1160    if(dbi_result_seek_row(result, row_number)) {
1161
1162       field_name = dbi_result_get_field_name(result, column_number);
1163       field_length = dbi_result_get_field_length(result, field_name);
1164       dbitype = dbi_result_get_field_type_idx(result,column_number);
1165
1166       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1167             "field_length bytes: '%d' fieldname: '%s'\n",
1168             dbitype, field_length, field_name);
1169
1170       if(field_length) {
1171          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1172          buf = (char *)malloc(field_length + 1);
1173       } else {
1174          /* if numbers */
1175          buf = (char *)malloc(sizeof(char *) * 50);
1176       }
1177
1178       switch (dbitype) {
1179       case DBI_TYPE_INTEGER:
1180          num = dbi_result_get_longlong(result, field_name);
1181          edit_int64(num, buf);
1182          field_length = strlen(buf);
1183          break;
1184       case DBI_TYPE_STRING:
1185          if(field_length) {
1186             field_length = bsnprintf(buf, field_length + 1, "%s",
1187             dbi_result_get_string(result, field_name));
1188          } else {
1189             buf[0] = 0;
1190          }
1191          break;
1192       case DBI_TYPE_BINARY:
1193          /* dbi_result_get_binary return a NULL pointer if value is empty
1194          * following, change this to what Bacula espected
1195          */
1196          if(field_length) {
1197             field_length = bsnprintf(buf, field_length + 1, "%s",
1198                   dbi_result_get_binary(result, field_name));
1199          } else {
1200             buf[0] = 0;
1201          }
1202          break;
1203       case DBI_TYPE_DATETIME:
1204          time_t last;
1205          struct tm tm;
1206
1207          last = dbi_result_get_datetime(result, field_name);
1208
1209          if(last == -1) {
1210                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1211          } else {
1212             (void)localtime_r(&last, &tm);
1213             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1214                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1215                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1216          }
1217          break;
1218       }
1219
1220    } else {
1221       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1222       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1223    }
1224
1225    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1226       buf, field_length, buf);
1227
1228    // don't worry about this buf
1229    return buf;
1230 }
1231
1232 static int my_dbi_sequence_last(B_DB *mdb, char *table_name)
1233 {
1234    /*
1235     Obtain the current value of the sequence that
1236     provides the serial value for primary key of the table.
1237
1238     currval is local to our session.  It is not affected by
1239     other transactions.
1240
1241     Determine the name of the sequence.
1242     PostgreSQL automatically creates a sequence using
1243     <table>_<column>_seq.
1244     At the time of writing, all tables used this format for
1245     for their primary key: <table>id
1246     Except for basefiles which has a primary key on baseid.
1247     Therefore, we need to special case that one table.
1248
1249     everything else can use the PostgreSQL formula.
1250    */
1251
1252    char sequence[30];
1253    uint64_t id = 0;
1254
1255    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1256
1257       if (strcasecmp(table_name, "basefiles") == 0) {
1258          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1259       } else {
1260          bstrncpy(sequence, table_name, sizeof(sequence));
1261          bstrncat(sequence, "_",        sizeof(sequence));
1262          bstrncat(sequence, table_name, sizeof(sequence));
1263          bstrncat(sequence, "id",       sizeof(sequence));
1264       }
1265
1266       bstrncat(sequence, "_seq", sizeof(sequence));
1267       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1268    } else {
1269       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1270    }
1271
1272    return id;
1273 }
1274
1275 int my_dbi_sql_insert_id(B_DB *mdb, const char *query, const char *table_name)
1276 {
1277    /*
1278     * First execute the insert query and then retrieve the currval.
1279     */
1280    if (!my_dbi_query(mdb, query)) {
1281       return 0;
1282    }
1283
1284    mdb->num_rows = sql_affected_rows(mdb);
1285    if (mdb->num_rows != 1) {
1286       return 0;
1287    }
1288
1289    mdb->changes++;
1290
1291    return my_dbi_sequence_last(mdb, table_name);
1292 }
1293
1294 #ifdef HAVE_BATCH_FILE_INSERT
1295 const char *my_dbi_batch_lock_path_query[5] = {
1296    /* Mysql */
1297    "LOCK TABLES Path write, batch write, Path as p write",
1298    /* Postgresql */
1299    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1300    /* SQLite */
1301    "BEGIN",
1302    /* SQLite3 */
1303    "BEGIN",
1304    /* Ingres */
1305    "BEGIN"
1306 };
1307
1308 const char *my_dbi_batch_lock_filename_query[5] = {
1309    /* Mysql */
1310    "LOCK TABLES Filename write, batch write, Filename as f write",
1311    /* Postgresql */
1312    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1313    /* SQLite */
1314    "BEGIN",
1315    /* SQLite3 */
1316    "BEGIN",
1317    /* Ingres */
1318    "BEGIN"
1319 };
1320
1321 const char *my_dbi_batch_unlock_tables_query[5] = {
1322    /* Mysql */
1323    "UNLOCK TABLES",
1324    /* Postgresql */
1325    "COMMIT",
1326    /* SQLite */
1327    "COMMIT",
1328    /* SQLite3 */
1329    "COMMIT",
1330    /* Ingres */
1331    "COMMIT"
1332 };
1333
1334 const char *my_dbi_batch_fill_path_query[5] = {
1335    /* Mysql */
1336    "INSERT INTO Path (Path) "
1337    "SELECT a.Path FROM "
1338    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1339    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1340    /* Postgresql */
1341    "INSERT INTO Path (Path) "
1342    "SELECT a.Path FROM "
1343    "(SELECT DISTINCT Path FROM batch) AS a "
1344    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1345    /* SQLite */
1346    "INSERT INTO Path (Path)"
1347    " SELECT DISTINCT Path FROM batch"
1348    " EXCEPT SELECT Path FROM Path",
1349    /* SQLite3 */
1350    "INSERT INTO Path (Path)"
1351    " SELECT DISTINCT Path FROM batch"
1352    " EXCEPT SELECT Path FROM Path",
1353    /* Ingres */
1354    "INSERT INTO Path (Path) "
1355    "SELECT a.Path FROM "
1356    "(SELECT DISTINCT Path FROM batch) AS a "
1357    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1358 };
1359
1360 const char *my_dbi_batch_fill_filename_query[5] = {
1361    /* Mysql */
1362    "INSERT INTO Filename (Name) "
1363    "SELECT a.Name FROM "
1364    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1365    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1366    /* Postgresql */
1367    "INSERT INTO Filename (Name) "
1368    "SELECT a.Name FROM "
1369    "(SELECT DISTINCT Name FROM batch) as a "
1370    "WHERE NOT EXISTS "
1371    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1372    /* SQLite */
1373    "INSERT INTO Filename (Name)"
1374    " SELECT DISTINCT Name FROM batch "
1375    " EXCEPT SELECT Name FROM Filename",
1376    /* SQLite3 */
1377    "INSERT INTO Filename (Name)"
1378    " SELECT DISTINCT Name FROM batch "
1379    " EXCEPT SELECT Name FROM Filename",
1380    /* Ingres */
1381    "INSERT INTO Filename (Name) "
1382    "SELECT a.Name FROM "
1383    "(SELECT DISTINCT Name FROM batch) as a "
1384    "WHERE NOT EXISTS "
1385    "(SELECT Name FROM Filename WHERE Name = a.Name)"
1386 };
1387
1388 #endif /* HAVE_BATCH_FILE_INSERT */
1389
1390 const char *my_dbi_match[5] = {
1391    /* Mysql */
1392    "MATCH",
1393    /* Postgresql */
1394    "~",
1395    /* SQLite */
1396    "MATCH",
1397    /* SQLite3 */
1398    "MATCH",
1399    /* Ingres */
1400    "~"
1401 };
1402
1403 #endif /* HAVE_DBI */