]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Free db_list when not used
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  */
37
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #ifdef HAVE_DBI
48
49 /* -----------------------------------------------------------------------
50  *
51  *   DBI dependent defines and subroutines
52  *
53  * -----------------------------------------------------------------------
54  */
55
56 /* List of open databases */
57 static dlist *db_list = NULL;
58
59 /* Control allocated fields by my_dbi_getvalue */
60 static dlist *dbi_getvalue_list = NULL;
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "DBI";
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb = NULL;
83    DBI_FIELD_GET *field;
84    char db_driver[10];
85    char db_driverdir[256];
86
87    /* Constraint the db_driver */
88    if(db_type  == -1) {
89       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
90       return NULL;
91    }
92
93    /* Do the correct selection of driver.
94     * Can be one of the varius supported by libdbi
95     */
96    switch (db_type) {
97    case SQL_TYPE_MYSQL:
98       bstrncpy(db_driver,"mysql", sizeof(db_driver));
99       break;
100    case SQL_TYPE_POSTGRESQL:
101       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
102       break;
103    case SQL_TYPE_SQLITE:
104       bstrncpy(db_driver,"sqlite", sizeof(db_driver));
105       break;
106    case SQL_TYPE_SQLITE3:
107       bstrncpy(db_driver,"sqlite3", sizeof(db_driver));
108       break;
109    }
110
111    /* Set db_driverdir whereis is the libdbi drivers */
112    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
113
114    if (!db_user) {
115       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
116       return NULL;
117    }
118    P(mutex);                          /* lock DB queue */
119    if (db_list == NULL) {
120       db_list = New(dlist(mdb, &mdb->link));
121       db_getvalue_list = New(dlist(field, field->link));
122    }
123    if (!mult_db_connections) {
124       /* Look to see if DB already open */
125       foreach_dlist(mdb, db_list) {
126          if (bstrcmp(mdb->db_name, db_name) &&
127              bstrcmp(mdb->db_address, db_address) &&
128              bstrcmp(mdb->db_driver, db_driver) &&
129              mdb->db_port == db_port) {
130             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name,
131                   dbi_conn_error(mdb->db, NULL));
132             mdb->ref_count++;
133             V(mutex);
134             return mdb;                  /* already open */
135          }
136       }
137    }
138    Dmsg0(100, "db_open first time\n");
139    mdb = (B_DB *)malloc(sizeof(B_DB));
140    memset(mdb, 0, sizeof(B_DB));
141    mdb->db_name = bstrdup(db_name);
142    mdb->db_user = bstrdup(db_user);
143    if (db_password) {
144       mdb->db_password = bstrdup(db_password);
145    }
146    if (db_address) {
147       mdb->db_address  = bstrdup(db_address);
148    }
149    if (db_socket) {
150       mdb->db_socket   = bstrdup(db_socket);
151    }
152    if (db_driverdir) {
153       mdb->db_driverdir = bstrdup(db_driverdir);
154    }
155    if (db_driver) {
156       mdb->db_driver    = bstrdup(db_driver);
157    }
158    mdb->db_type        = db_type;
159    mdb->db_port        = db_port;
160    mdb->have_insert_id = TRUE;
161    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
162    *mdb->errmsg        = 0;
163    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
164    mdb->cached_path    = get_pool_memory(PM_FNAME);
165    mdb->cached_path_id = 0;
166    mdb->ref_count      = 1;
167    mdb->fname          = get_pool_memory(PM_FNAME);
168    mdb->path           = get_pool_memory(PM_FNAME);
169    mdb->esc_name       = get_pool_memory(PM_FNAME);
170    mdb->esc_path      = get_pool_memory(PM_FNAME);
171    mdb->allow_transactions = mult_db_connections;
172    db_list->append(mdb);                   /* put db in list */
173    V(mutex);
174    return mdb;
175 }
176
177 /*
178  * Now actually open the database.  This can generate errors,
179  *   which are returned in the errmsg
180  *
181  * DO NOT close the database or free(mdb) here  !!!!
182  */
183 int
184 db_open_database(JCR *jcr, B_DB *mdb)
185 {
186    int errstat;
187    int dbstat;
188    uint8_t len;
189    const char *errmsg;
190    char buf[10], *port;
191    int numdrivers;
192    char *db_name = NULL;
193    char *db_dir = NULL;
194
195    P(mutex);
196    if (mdb->connected) {
197       V(mutex);
198       return 1;
199    }
200    mdb->connected = false;
201
202    if ((errstat=rwl_init(&mdb->lock)) != 0) {
203       berrno be;
204       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
205             be.bstrerror(errstat));
206       V(mutex);
207       return 0;
208    }
209
210    if (mdb->db_port) {
211       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
212       port = buf;
213    } else {
214       port = NULL;
215    }
216
217    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
218    if (numdrivers < 0) {
219       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
220                                "db_driverdir=%s. It is probaly not found any drivers\n"),
221                                mdb->db_driverdir,numdrivers);
222       V(mutex);
223       return 0;
224    }
225    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
226    /* Can be many types of databases */
227    switch (mdb->db_type) {
228    case SQL_TYPE_MYSQL:
229       dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
230       dbi_conn_set_option(mdb->db, "port", port);            /* default port */
231       dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
232       dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
233       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
234       break;
235    case SQL_TYPE_POSTGRESQL:
236       dbi_conn_set_option(mdb->db, "host", mdb->db_address);
237       dbi_conn_set_option(mdb->db, "port", port);
238       dbi_conn_set_option(mdb->db, "username", mdb->db_user);
239       dbi_conn_set_option(mdb->db, "password", mdb->db_password);
240       dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);
241       break;
242    case SQL_TYPE_SQLITE:
243       len = strlen(working_directory) + 5;
244       db_dir = (char *)malloc(len);
245       strcpy(db_dir, working_directory);
246       strcat(db_dir, "/");
247       len = strlen(mdb->db_name) + 5;
248       db_name = (char *)malloc(len);
249       strcpy(db_name, mdb->db_name);
250       strcat(db_name, ".db");
251       dbi_conn_set_option(mdb->db, "sqlite_dbdir", db_dir);
252       dbi_conn_set_option(mdb->db, "dbname", db_name);
253       break;
254    case SQL_TYPE_SQLITE3:
255       len = strlen(working_directory) + 5;
256       db_dir = (char *)malloc(len);
257       strcpy(db_dir, working_directory);
258       strcat(db_dir, "/");
259       len = strlen(mdb->db_name) + 5;
260       db_name = (char *)malloc(len);
261       strcpy(db_name, mdb->db_name);
262       strcat(db_name, ".db");
263       dbi_conn_set_option(mdb->db, "sqlite3_dbdir", db_dir);
264       dbi_conn_set_option(mdb->db, "dbname", db_name);
265       Dmsg2(500, "SQLITE: %s %s\n", db_dir, db_name);
266       break;
267    }
268
269    /* If connection fails, try at 5 sec intervals for 30 seconds. */
270    for (int retry=0; retry < 6; retry++) {
271
272       dbstat = dbi_conn_connect(mdb->db);
273       if ( dbstat == 0) {
274          break;
275       }
276
277       dbi_conn_error(mdb->db, &errmsg);
278       Dmsg1(50, "dbi error: %s\n", errmsg);
279
280       bmicrosleep(5, 0);
281
282    }
283
284    if ( dbstat != 0 ) {
285       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface. Type=%s Database=%s User=%s\n"
286          "Possible causes: SQL server not running; password incorrect; max_connections exceeded.\n"),
287          mdb->db_driver, mdb->db_name, mdb->db_user);
288       V(mutex);
289       return 0;
290    }
291
292    Dmsg0(50, "dbi_real_connect done\n");
293    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
294                     mdb->db_user, mdb->db_name,
295                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
296
297    mdb->connected = true;
298
299    if (!check_tables_version(jcr, mdb)) {
300       V(mutex);
301       return 0;
302    }
303
304    switch (mdb->db_type) {
305    case SQL_TYPE_MYSQL:
306       /* Set connection timeout to 8 days specialy for batch mode */
307       sql_query(mdb, "SET wait_timeout=691200");
308       sql_query(mdb, "SET interactive_timeout=691200");
309       break;
310    case SQL_TYPE_POSTGRESQL:
311       /* tell PostgreSQL we are using standard conforming strings
312          and avoid warnings such as:
313          WARNING:  nonstandard use of \\ in a string literal
314       */
315       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
316       sql_query(mdb, "set standard_conforming_strings=on");
317       break;
318    }
319
320    if(db_dir) {
321       free(db_dir);
322    }
323    if(db_name) {
324       free(db_name);
325    }
326
327    V(mutex);
328    return 1;
329 }
330
331 void
332 db_close_database(JCR *jcr, B_DB *mdb)
333 {
334    if (!mdb) {
335       return;
336    }
337    db_end_transaction(jcr, mdb);
338    P(mutex);
339    sql_free_result(mdb);
340    mdb->ref_count--;
341    if (mdb->ref_count == 0) {
342       db_list->remove(mdb);
343       if (mdb->connected && mdb->db) {
344          //sql_close(mdb);
345          dbi_shutdown_r(mdb->instance);
346          mdb->db = NULL;
347          mdb->instance = NULL;
348       }
349       rwl_destroy(&mdb->lock);
350       free_pool_memory(mdb->errmsg);
351       free_pool_memory(mdb->cmd);
352       free_pool_memory(mdb->cached_path);
353       free_pool_memory(mdb->fname);
354       free_pool_memory(mdb->path);
355       free_pool_memory(mdb->esc_name);
356       free_pool_memory(mdb->esc_path);
357       if (mdb->db_name) {
358          free(mdb->db_name);
359       }
360       if (mdb->db_user) {
361          free(mdb->db_user);
362       }
363       if (mdb->db_password) {
364          free(mdb->db_password);
365       }
366       if (mdb->db_address) {
367          free(mdb->db_address);
368       }
369       if (mdb->db_socket) {
370          free(mdb->db_socket);
371       }
372       if (mdb->db_driverdir) {
373          free(mdb->db_driverdir);
374       }
375       if (mdb->db_driver) {
376           free(mdb->db_driver);
377       }
378       free(mdb);
379       if (db_list->size() == 0) {
380          delete db_list;
381          db_list = NULL;
382       }
383    }
384    V(mutex);
385 }
386
387 void db_thread_cleanup()
388 { }
389
390 /*
391  * Return the next unique index (auto-increment) for
392  * the given table.  Return NULL on error.
393  *
394  */
395 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
396 {
397    strcpy(index, "NULL");
398    return 1;
399 }
400
401
402 /*
403  * Escape strings so that DBI is happy
404  *
405  *   NOTE! len is the length of the old string. Your new
406  *         string must be long enough (max 2*old+1) to hold
407  *         the escaped output.
408  *
409  * dbi_conn_quote_string_copy receives a pointer to pointer.
410  * We need copy the value of pointer to snew because libdbi change the
411  * pointer
412  */
413 void
414 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
415 {
416    char *inew;
417    char *pnew;
418
419    if (len == 0) {
420       snew[0] = 0;
421    } else {
422       /* correct the size of old basead in len
423        * and copy new string to inew
424        */
425       inew = (char *)malloc(sizeof(char) * len + 1);
426       bstrncpy(inew,old,len + 1);
427       /* escape the correct size of old */
428       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
429       free(inew);
430       /* copy the escaped string to snew */
431       bstrncpy(snew, pnew, 2 * len + 1);
432    }
433
434    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
435
436 }
437
438 /*
439  * Submit a general SQL command (cmd), and for each row returned,
440  *  the sqlite_handler is called with the ctx.
441  */
442 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
443 {
444    SQL_ROW row;
445
446    Dmsg0(500, "db_sql_query started\n");
447
448    db_lock(mdb);
449    if (sql_query(mdb, query) != 0) {
450       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
451       db_unlock(mdb);
452       Dmsg0(500, "db_sql_query failed\n");
453       return false;
454    }
455    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
456
457    if (result_handler != NULL) {
458       Dmsg0(500, "db_sql_query invoking handler\n");
459       if ((mdb->result = sql_store_result(mdb)) != NULL) {
460          int num_fields = sql_num_fields(mdb);
461
462          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
463          while ((row = sql_fetch_row(mdb)) != NULL) {
464
465             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
466             if (result_handler(ctx, num_fields, row))
467                break;
468          }
469
470         sql_free_result(mdb);
471       }
472    }
473    db_unlock(mdb);
474
475    Dmsg0(500, "db_sql_query finished\n");
476
477    return true;
478 }
479
480
481
482 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
483 {
484    int j;
485    DBI_ROW row = NULL; // by default, return NULL
486
487    Dmsg0(500, "my_dbi_fetch_row start\n");
488    if ((!mdb->row || mdb->row_size < mdb->num_fields) && mdb->num_rows > 0) {
489       int num_fields = mdb->num_fields;
490       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
491
492       if (mdb->row) {
493          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
494          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
495          if (mdb->num_rows != 0) {
496             for(j = 0; j < mdb->num_fields; j++) {
497                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
498                   if(mdb->row[j]) {
499                      free(mdb->row[j]);
500                   }
501             }
502          }
503          free(mdb->row);
504       }
505       //num_fields += 20;                  /* add a bit extra */
506       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
507       mdb->row_size = num_fields;
508
509       // now reset the row_number now that we have the space allocated
510       mdb->row_number = 1;
511    }
512
513    // if still within the result set
514    if (mdb->row_number <= mdb->num_rows && mdb->row_number != DBI_ERROR_BADPTR) {
515       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
516       // get each value from this row
517       for (j = 0; j < mdb->num_fields; j++) {
518          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
519          // allocate space to queue row
520          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
521          // store the pointer in queue
522          mdb->field_get->value = mdb->row[j];
523          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
524                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
525          // insert in queue to future free
526          dbi_getvalue_list->append(mdb->field_get);
527       }
528       // increment the row number for the next call
529       mdb->row_number++;
530
531       row = mdb->row;
532    } else {
533       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
534    }
535
536    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
537
538    return row;
539 }
540
541 int my_dbi_max_length(B_DB *mdb, int field_num) {
542    //
543    // for a given column, find the max length
544    //
545    int max_length;
546    int i;
547    int this_length;
548    char *cbuf = NULL;
549
550    max_length = 0;
551    for (i = 0; i < mdb->num_rows; i++) {
552       if (my_dbi_getisnull(mdb->result, i, field_num)) {
553           this_length = 4;        // "NULL"
554       } else {
555          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
556          this_length = cstrlen(cbuf);
557          // cbuf is always free
558          free(cbuf);
559       }
560
561       if (max_length < this_length) {
562           max_length = this_length;
563       }
564    }
565
566    return max_length;
567 }
568
569 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
570 {
571    int     i;
572    int     dbi_index;
573
574    Dmsg0(500, "my_dbi_fetch_field starts\n");
575
576    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
577       if (mdb->fields) {
578          free(mdb->fields);
579       }
580       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
581       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
582       mdb->fields_size = mdb->num_fields;
583
584       for (i = 0; i < mdb->num_fields; i++) {
585          // num_fileds is starting at 1, increment i by 1
586          dbi_index = i + 1;
587          Dmsg1(500, "filling field %d\n", i);
588          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
589          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
590          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
591          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
592
593          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
594             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
595             mdb->fields[i].flags);
596       } // end for
597    } // end if
598
599    // increment field number for the next time around
600
601    Dmsg0(500, "my_dbi_fetch_field finishes\n");
602    return &mdb->fields[mdb->field_number++];
603 }
604
605 void my_dbi_data_seek(B_DB *mdb, int row)
606 {
607    // set the row number to be returned on the next call
608    // to my_dbi_fetch_row
609    mdb->row_number = row;
610 }
611
612 void my_dbi_field_seek(B_DB *mdb, int field)
613 {
614    mdb->field_number = field;
615 }
616
617 /*
618  * Note, if this routine returns 1 (failure), Bacula expects
619  *  that no result has been stored.
620  *
621  *  Returns:  0  on success
622  *            1  on failure
623  *
624  */
625 int my_dbi_query(B_DB *mdb, const char *query)
626 {
627    const char *errmsg;
628    Dmsg1(500, "my_dbi_query started %s\n", query);
629    // We are starting a new query.  reset everything.
630    mdb->num_rows     = -1;
631    mdb->row_number   = -1;
632    mdb->field_number = -1;
633
634    if (mdb->result) {
635       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
636       mdb->result = NULL;
637    }
638
639    mdb->result = (void **)dbi_conn_query(mdb->db, query);
640
641    if (!mdb->result) {
642       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);
643       goto bail_out;
644    }
645
646    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
647
648    if (mdb->status == DBI_ERROR_NONE) {
649       Dmsg1(500, "we have a result\n", query);
650
651       // how many fields in the set?
652       // num_fields starting at 1
653       mdb->num_fields = dbi_result_get_numfields(mdb->result);
654       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
655       // if no result num_rows is 0
656       mdb->num_rows = dbi_result_get_numrows(mdb->result);
657       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
658
659       mdb->status = (dbi_error_flag) 0;                  /* succeed */
660    } else {
661       Dmsg1(50, "Result status failed: %s\n", query);
662       goto bail_out;
663    }
664
665    Dmsg0(500, "my_dbi_query finishing\n");
666    return mdb->status;
667
668 bail_out:
669    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
670    //dbi_conn_error(mdb->db, &errmsg);
671    Dmsg4(500, "my_dbi_query we failed dbi error: "
672                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
673    dbi_result_free(mdb->result);
674    mdb->result = NULL;
675    mdb->status = (dbi_error_flag) 1;                   /* failed */
676    return mdb->status;
677 }
678
679 void my_dbi_free_result(B_DB *mdb)
680 {
681
682    DBI_FIELD_GET *f;
683    db_lock(mdb);
684    if (mdb->result) {
685       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
686       dbi_result_free(mdb->result);
687    }
688
689    mdb->result = NULL;
690
691    if (mdb->row) {
692       free(mdb->row);
693    }
694
695    /* now is time to free all value return by my_dbi_get_value
696     * this is necessary because libdbi don't free memory return by yours results
697     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
698     *
699     * Using a queue to store all pointer allocate is a good way to free all things
700     * when necessary
701     */
702    foreach_dlist(f, dbi_getvalue_list) {
703       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
704       free(f->value);
705       free(f);
706    }
707
708    mdb->row = NULL;
709
710    if (mdb->fields) {
711       free(mdb->fields);
712       mdb->fields = NULL;
713    }
714    db_unlock(mdb);
715    Dmsg0(500, "my_dbi_free_result finish\n");
716
717 }
718
719 const char *my_dbi_strerror(B_DB *mdb)
720 {
721    const char *errmsg;
722
723    dbi_conn_error(mdb->db, &errmsg);
724
725    return errmsg;
726 }
727
728 #ifdef HAVE_BATCH_FILE_INSERT
729
730 /*
731  * This can be a bit strang but is the one way to do
732  *
733  * Returns 1 if OK
734  *         0 if failed
735  */
736 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
737 {
738    char *query = "COPY batch FROM STDIN";
739
740    Dmsg0(500, "my_dbi_batch_start started\n");
741
742    switch (mdb->db_type) {
743    case SQL_TYPE_MYSQL:
744       db_lock(mdb);
745       if (my_dbi_query(mdb,
746                               "CREATE TEMPORARY TABLE batch ("
747                                   "FileIndex integer,"
748                                   "JobId integer,"
749                                   "Path blob,"
750                                   "Name blob,"
751                                   "LStat tinyblob,"
752                                   "MD5 tinyblob)") == 1)
753       {
754          Dmsg0(500, "my_dbi_batch_start failed\n");
755          return 1;
756       }
757       db_unlock(mdb);
758       Dmsg0(500, "my_dbi_batch_start finishing\n");
759       return 1;
760       break;
761    case SQL_TYPE_POSTGRESQL:
762
763       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
764                                   "fileindex int,"
765                                   "jobid int,"
766                                   "path varchar,"
767                                   "name varchar,"
768                                   "lstat varchar,"
769                                   "md5 varchar)") == 1)
770       {
771          Dmsg0(500, "my_dbi_batch_start failed\n");
772          return 1;
773       }
774
775       // We are starting a new query.  reset everything.
776       mdb->num_rows     = -1;
777       mdb->row_number   = -1;
778       mdb->field_number = -1;
779
780       my_dbi_free_result(mdb);
781
782       for (int i=0; i < 10; i++) {
783          my_dbi_query(mdb, query);
784          if (mdb->result) {
785             break;
786          }
787          bmicrosleep(5, 0);
788       }
789       if (!mdb->result) {
790          Dmsg1(50, "Query failed: %s\n", query);
791          goto bail_out;
792       }
793
794       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
795       //mdb->status = DBI_ERROR_NONE;
796
797       if (mdb->status == DBI_ERROR_NONE) {
798          // how many fields in the set?
799          mdb->num_fields = dbi_result_get_numfields(mdb->result);
800          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
801          mdb->status = (dbi_error_flag) 1;
802       } else {
803          Dmsg1(50, "Result status failed: %s\n", query);
804          goto bail_out;
805       }
806
807       Dmsg0(500, "my_postgresql_batch_start finishing\n");
808
809       return mdb->status;
810       break;
811    case SQL_TYPE_SQLITE:
812       db_lock(mdb);
813       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
814                                   "FileIndex integer,"
815                                   "JobId integer,"
816                                   "Path blob,"
817                                   "Name blob,"
818                                   "LStat tinyblob,"
819                                   "MD5 tinyblob)") == 1)
820       {
821          Dmsg0(500, "my_dbi_batch_start failed\n");
822          goto bail_out;
823       }
824       db_unlock(mdb);
825       Dmsg0(500, "my_dbi_batch_start finishing\n");
826       return 1;
827       break;
828    case SQL_TYPE_SQLITE3:
829       db_lock(mdb);
830       if (my_dbi_query(mdb, "CREATE TEMPORARY TABLE batch ("
831                                   "FileIndex integer,"
832                                   "JobId integer,"
833                                   "Path blob,"
834                                   "Name blob,"
835                                   "LStat tinyblob,"
836                                   "MD5 tinyblob)") == 1)
837       {
838          Dmsg0(500, "my_dbi_batch_start failed\n");
839          goto bail_out;
840       }
841       db_unlock(mdb);
842       Dmsg0(500, "my_dbi_batch_start finishing\n");
843       return 1;
844       break;
845    }
846
847 bail_out:
848    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
849    mdb->status = (dbi_error_flag) 0;
850    my_dbi_free_result(mdb);
851    mdb->result = NULL;
852    return mdb->status;
853 }
854
855 /* set error to something to abort operation */
856 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
857 {
858    int res = 0;
859    int count = 30;
860    int (*custom_function)(void*, const char*) = NULL;
861    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
862
863    Dmsg0(500, "my_dbi_batch_end started\n");
864
865    if (!mdb) {                  /* no files ? */
866       return 0;
867    }
868
869    switch (mdb->db_type) {
870    case SQL_TYPE_MYSQL:
871       if(mdb) {
872          mdb->status = (dbi_error_flag) 0;
873       }
874       break;
875    case SQL_TYPE_POSTGRESQL:
876       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
877
878
879       do {
880          res = (*custom_function)(myconn->connection, error);
881       } while (res == 0 && --count > 0);
882
883       if (res == 1) {
884          Dmsg0(500, "ok\n");
885          mdb->status = (dbi_error_flag) 1;
886       }
887
888       if (res <= 0) {
889          Dmsg0(500, "we failed\n");
890          mdb->status = (dbi_error_flag) 0;
891          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
892        }
893       break;
894    case SQL_TYPE_SQLITE:
895       if(mdb) {
896          mdb->status = (dbi_error_flag) 0;
897       }
898       break;
899    case SQL_TYPE_SQLITE3:
900       if(mdb) {
901          mdb->status = (dbi_error_flag) 0;
902       }
903       break;
904    }
905
906    Dmsg0(500, "my_dbi_batch_end finishing\n");
907
908    return true;
909 }
910
911 /*
912  * This function is big and use a big switch.
913  * In near future is better split in small functions
914  * and refactory.
915  *
916  */
917 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
918 {
919    int res;
920    int count=30;
921    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
922    int (*custom_function)(void*, const char*, int) = NULL;
923    char* (*custom_function_error)(void*) = NULL;
924    size_t len;
925    char *digest;
926    char ed1[50];
927
928    Dmsg0(500, "my_dbi_batch_insert started \n");
929
930    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
931    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
932
933    if (ar->Digest == NULL || ar->Digest[0] == 0) {
934       digest = "0";
935    } else {
936       digest = ar->Digest;
937    }
938
939    switch (mdb->db_type) {
940    case SQL_TYPE_MYSQL:
941       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
942       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
943       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
944                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
945                       mdb->esc_name, ar->attr, digest);
946
947       if (my_dbi_query(mdb,mdb->cmd) == 1)
948       {
949          Dmsg0(500, "my_dbi_batch_insert failed\n");
950          goto bail_out;
951       }
952
953       Dmsg0(500, "my_dbi_batch_insert finishing\n");
954
955       return 1;
956       break;
957    case SQL_TYPE_POSTGRESQL:
958       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
959       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
960       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n",
961                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path,
962                      mdb->esc_name, ar->attr, digest);
963
964       /* libdbi don't support CopyData and we need call a postgresql
965        * specific function to do this work
966        */
967       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
968       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
969             "PQputCopyData")) != NULL) {
970          do {
971             res = (*custom_function)(myconn->connection, mdb->cmd, len);
972          } while (res == 0 && --count > 0);
973
974          if (res == 1) {
975             Dmsg0(500, "ok\n");
976             mdb->changes++;
977             mdb->status = (dbi_error_flag) 1;
978          }
979
980          if (res <= 0) {
981             Dmsg0(500, "my_dbi_batch_insert failed\n");
982             goto bail_out;
983          }
984
985          Dmsg0(500, "my_dbi_batch_insert finishing\n");
986          return mdb->status;
987       } else {
988          // ensure to detect a PQerror
989          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
990          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
991          goto bail_out;
992       }
993       break;
994    case SQL_TYPE_SQLITE:
995       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
996       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
997       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
998                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
999                       mdb->esc_name, ar->attr, digest);
1000       if (my_dbi_query(mdb,mdb->cmd) == 1)
1001       {
1002          Dmsg0(500, "my_dbi_batch_insert failed\n");
1003          goto bail_out;
1004       }
1005
1006       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1007
1008       return 1;
1009       break;
1010    case SQL_TYPE_SQLITE3:
1011       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
1012       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
1013       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
1014                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
1015                       mdb->esc_name, ar->attr, digest);
1016       if (my_dbi_query(mdb,mdb->cmd) == 1)
1017       {
1018          Dmsg0(500, "my_dbi_batch_insert failed\n");
1019          goto bail_out;
1020       }
1021
1022       Dmsg0(500, "my_dbi_batch_insert finishing\n");
1023
1024       return 1;
1025       break;
1026    }
1027
1028 bail_out:
1029   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
1030   mdb->status = (dbi_error_flag) 0;
1031   my_dbi_free_result(mdb);
1032   return mdb->status;
1033 }
1034
1035 /*
1036  * Escape strings so that PostgreSQL is happy on COPY
1037  *
1038  *   NOTE! len is the length of the old string. Your new
1039  *         string must be long enough (max 2*old+1) to hold
1040  *         the escaped output.
1041  */
1042 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
1043 {
1044    /* we have to escape \t, \n, \r, \ */
1045    char c = '\0' ;
1046
1047    while (len > 0 && *src) {
1048       switch (*src) {
1049       case '\n':
1050          c = 'n';
1051          break;
1052       case '\\':
1053          c = '\\';
1054          break;
1055       case '\t':
1056          c = 't';
1057          break;
1058       case '\r':
1059          c = 'r';
1060          break;
1061       default:
1062          c = '\0' ;
1063       }
1064
1065       if (c) {
1066          *dest = '\\';
1067          dest++;
1068          *dest = c;
1069       } else {
1070          *dest = *src;
1071       }
1072
1073       len--;
1074       src++;
1075       dest++;
1076    }
1077
1078    *dest = '\0';
1079    return dest;
1080 }
1081
1082 #endif /* HAVE_BATCH_FILE_INSERT */
1083
1084 /* my_dbi_getisnull
1085  * like PQgetisnull
1086  * int PQgetisnull(const PGresult *res,
1087  *              int row_number,
1088  *               int column_number);
1089  *
1090  *  use dbi_result_seek_row to search in result set
1091  */
1092 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1093    int i;
1094
1095    if(row_number == 0) {
1096       row_number++;
1097    }
1098
1099    column_number++;
1100
1101    if(dbi_result_seek_row(result, row_number)) {
1102
1103       i = dbi_result_field_is_null_idx(result,column_number);
1104
1105       return i;
1106    } else {
1107
1108       return 0;
1109    }
1110
1111 }
1112 /* my_dbi_getvalue
1113  * like PQgetvalue;
1114  * char *PQgetvalue(const PGresult *res,
1115  *                int row_number,
1116  *                int column_number);
1117  *
1118  * use dbi_result_seek_row to search in result set
1119  * use example to return only strings
1120  */
1121 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1122
1123    char *buf = NULL;
1124    const char *errmsg;
1125    const char *field_name;
1126    unsigned short dbitype;
1127    size_t field_length;
1128    int64_t num;
1129
1130    /* correct the index for dbi interface
1131     * dbi index begins 1
1132     * I prefer do not change others functions
1133     */
1134    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n",
1135                                 result, row_number, column_number);
1136
1137    column_number++;
1138
1139    if(row_number == 0) {
1140      row_number++;
1141    }
1142
1143    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n",
1144                         result, row_number, column_number);
1145
1146    if(dbi_result_seek_row(result, row_number)) {
1147
1148       field_name = dbi_result_get_field_name(result, column_number);
1149       field_length = dbi_result_get_field_length(result, field_name);
1150       dbitype = dbi_result_get_field_type_idx(result,column_number);
1151
1152       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1153             "field_length bytes: '%d' fieldname: '%s'\n",
1154             dbitype, field_length, field_name);
1155
1156       if(field_length) {
1157          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1158          buf = (char *)malloc(field_length + 1);
1159       } else {
1160          /* if numbers */
1161          buf = (char *)malloc(sizeof(char *) * 50);
1162       }
1163
1164       switch (dbitype) {
1165       case DBI_TYPE_INTEGER:
1166          num = dbi_result_get_longlong(result, field_name);
1167          edit_int64(num, buf);
1168          field_length = strlen(buf);
1169          break;
1170       case DBI_TYPE_STRING:
1171          if(field_length) {
1172             field_length = bsnprintf(buf, field_length + 1, "%s",
1173             dbi_result_get_string(result, field_name));
1174          } else {
1175             buf[0] = 0;
1176          }
1177          break;
1178       case DBI_TYPE_BINARY:
1179          /* dbi_result_get_binary return a NULL pointer if value is empty
1180          * following, change this to what Bacula espected
1181          */
1182          if(field_length) {
1183             field_length = bsnprintf(buf, field_length + 1, "%s",
1184                   dbi_result_get_binary(result, field_name));
1185          } else {
1186             buf[0] = 0;
1187          }
1188          break;
1189       case DBI_TYPE_DATETIME:
1190          time_t last;
1191          struct tm tm;
1192
1193          last = dbi_result_get_datetime(result, field_name);
1194
1195          if(last == -1) {
1196                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00");
1197          } else {
1198             (void)localtime_r(&last, &tm);
1199             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1200                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1201                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1202          }
1203          break;
1204       }
1205
1206    } else {
1207       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1208       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1209    }
1210
1211    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n",
1212       buf, field_length, buf);
1213
1214    // don't worry about this buf
1215    return buf;
1216 }
1217
1218 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
1219 {
1220    /*
1221     Obtain the current value of the sequence that
1222     provides the serial value for primary key of the table.
1223
1224     currval is local to our session.  It is not affected by
1225     other transactions.
1226
1227     Determine the name of the sequence.
1228     PostgreSQL automatically creates a sequence using
1229     <table>_<column>_seq.
1230     At the time of writing, all tables used this format for
1231     for their primary key: <table>id
1232     Except for basefiles which has a primary key on baseid.
1233     Therefore, we need to special case that one table.
1234
1235     everything else can use the PostgreSQL formula.
1236    */
1237
1238    char      sequence[30];
1239    uint64_t    id = 0;
1240
1241    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1242
1243       if (strcasecmp(table_name, "basefiles") == 0) {
1244          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1245       } else {
1246          bstrncpy(sequence, table_name, sizeof(sequence));
1247          bstrncat(sequence, "_",        sizeof(sequence));
1248          bstrncat(sequence, table_name, sizeof(sequence));
1249          bstrncat(sequence, "id",       sizeof(sequence));
1250       }
1251
1252       bstrncat(sequence, "_seq", sizeof(sequence));
1253       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1254    } else {
1255       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1256    }
1257
1258    return id;
1259 }
1260
1261 #ifdef HAVE_BATCH_FILE_INSERT
1262 const char *my_dbi_batch_lock_path_query[5] = {
1263    /* Mysql */
1264    "LOCK TABLES Path write, batch write, Path as p write",
1265    /* Postgresql */
1266    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1267    /* SQLite */
1268    "BEGIN",
1269    /* SQLite3 */
1270    "BEGIN",
1271    /* Ingres (TODO) */
1272    "BEGIN"
1273 };
1274
1275 const char *my_dbi_batch_lock_filename_query[5] = {
1276    /* Mysql */
1277    "LOCK TABLES Filename write, batch write, Filename as f write",
1278    /* Postgresql */
1279    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1280    /* SQLite */
1281    "BEGIN",
1282    /* SQLite3 */
1283    "BEGIN",
1284    /* Ingres (TODO) */
1285    "BEGIN"
1286 };
1287
1288 const char *my_dbi_batch_unlock_tables_query[5] = {
1289    /* Mysql */
1290    "UNLOCK TABLES",
1291    /* Postgresql */
1292    "COMMIT",
1293    /* SQLite */
1294    "COMMIT",
1295    /* SQLite3 */
1296    "COMMIT",
1297    /* Ingres */
1298    "COMMIT"
1299 };
1300
1301 const char *my_dbi_batch_fill_path_query[5] = {
1302    /* Mysql */
1303    "INSERT INTO Path (Path) "
1304    "SELECT a.Path FROM "
1305    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1306    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1307    /* Postgresql */
1308    "INSERT INTO Path (Path) "
1309    "SELECT a.Path FROM "
1310    "(SELECT DISTINCT Path FROM batch) AS a "
1311    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1312    /* SQLite */
1313    "INSERT INTO Path (Path)"
1314    " SELECT DISTINCT Path FROM batch"
1315    " EXCEPT SELECT Path FROM Path",
1316    /* SQLite3 */
1317    "INSERT INTO Path (Path)"
1318    " SELECT DISTINCT Path FROM batch"
1319    " EXCEPT SELECT Path FROM Path",
1320    /* Ingres (TODO) */
1321    "INSERT INTO Path (Path) "
1322    "SELECT a.Path FROM "
1323    "(SELECT DISTINCT Path FROM batch) AS a "
1324    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) "
1325 };
1326
1327 const char *my_dbi_batch_fill_filename_query[5] = {
1328    /* Mysql */
1329    "INSERT INTO Filename (Name) "
1330    "SELECT a.Name FROM "
1331    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1332    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1333    /* Postgresql */
1334    "INSERT INTO Filename (Name) "
1335    "SELECT a.Name FROM "
1336    "(SELECT DISTINCT Name FROM batch) as a "
1337    "WHERE NOT EXISTS "
1338    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1339    /* SQLite */
1340    "INSERT INTO Filename (Name)"
1341    " SELECT DISTINCT Name FROM batch "
1342    " EXCEPT SELECT Name FROM Filename",
1343    /* SQLite3 */
1344    "INSERT INTO Filename (Name)"
1345    " SELECT DISTINCT Name FROM batch "
1346    " EXCEPT SELECT Name FROM Filename",
1347    /* Ingres (TODO) */
1348    "INSERT INTO Filename (Name) "
1349    "SELECT a.Name FROM "
1350    "(SELECT DISTINCT Name FROM batch) as a "
1351    "WHERE NOT EXISTS "
1352    "(SELECT Name FROM Filename WHERE Name = a.Name)"
1353 };
1354
1355 #endif /* HAVE_BATCH_FILE_INSERT */
1356
1357 const char *my_dbi_match[4] = {
1358    /* Mysql */
1359    "MATCH",
1360    /* Postgresql */
1361    "~",
1362    /* SQLite */
1363    "MATCH",
1364    /* SQLite3 */
1365    "MATCH"
1366 };
1367
1368 #endif /* HAVE_DBI */