]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Integrate the libdbi changes from Joao Henrique Freitas
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  *    Version $Id$
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_DBI
49
50 /* -----------------------------------------------------------------------
51  *
52  *   DBI dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /* List of open databases */
58 static BQUEUE db_list = {&db_list, &db_list};
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61
62 /*
63  * Retrieve database type
64  */
65 const char *
66 db_get_type(void)
67 {
68    return "DBI";
69 }
70
71 /*
72  * Initialize database data structure. In principal this should
73  * never have errors, or it is really fatal.
74  */
75 B_DB *
76 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
77                  const char *db_address, int db_port, const char *db_socket, 
78                  int mult_db_connections)
79 {
80    B_DB *mdb;
81    char db_driver[10];
82    char db_driverdir[256];
83
84    /* Constraint the db_driver */
85    if(db_type  == -1) {
86       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
87       return NULL;
88    }
89    
90    /* Do the correct selection of driver. 
91     * Can be one of the varius supported by libdbi 
92     */
93    switch (db_type) {
94    case SQL_TYPE_MYSQL:
95       bstrncpy(db_driver,"mysql", sizeof(db_driver));
96       break;
97    case SQL_TYPE_POSTGRESQL:
98       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
99       break;
100    case SQL_TYPE_SQLITE:
101       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
102       break;
103    }
104    
105    /* Set db_driverdir whereis is the libdbi drivers */
106    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
107    
108    if (!db_user) {
109       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
110       return NULL;
111    }
112    P(mutex);                          /* lock DB queue */
113    if (!mult_db_connections) {
114       /* Look to see if DB already open */
115       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
116          if (bstrcmp(mdb->db_name, db_name) &&
117              bstrcmp(mdb->db_address, db_address) &&
118              bstrcmp(mdb->db_driver, db_driver) &&
119              mdb->db_port == db_port) { 
120             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name, 
121                   dbi_conn_error(mdb->db, NULL));
122             mdb->ref_count++;
123             V(mutex);
124             return mdb;                  /* already open */
125          }
126       }
127    }
128    Dmsg0(100, "db_open first time\n");
129    mdb = (B_DB *)malloc(sizeof(B_DB));
130    memset(mdb, 0, sizeof(B_DB));
131    mdb->db_name = bstrdup(db_name);
132    mdb->db_user = bstrdup(db_user);
133    if (db_password) {
134       mdb->db_password = bstrdup(db_password);
135    }
136    if (db_address) {
137       mdb->db_address  = bstrdup(db_address);
138    }
139    if (db_socket) {
140       mdb->db_socket   = bstrdup(db_socket);
141    }
142    if (db_driverdir) {
143       mdb->db_driverdir = bstrdup(db_driverdir);
144    }
145    if (db_driver) {
146       mdb->db_driver    = bstrdup(db_driver);
147    }
148    mdb->db_type        = db_type;
149    mdb->db_port        = db_port;
150    mdb->have_insert_id = TRUE;
151    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
152    *mdb->errmsg        = 0;
153    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
154    mdb->cached_path    = get_pool_memory(PM_FNAME);
155    mdb->cached_path_id = 0;
156    mdb->ref_count      = 1;
157    mdb->fname          = get_pool_memory(PM_FNAME);
158    mdb->path           = get_pool_memory(PM_FNAME);
159    mdb->esc_name       = get_pool_memory(PM_FNAME);
160    mdb->esc_path      = get_pool_memory(PM_FNAME);
161    mdb->allow_transactions = mult_db_connections;
162    qinsert(&db_list, &mdb->bq);            /* put db in list */
163    V(mutex);
164    return mdb;
165 }
166
167 /*
168  * Now actually open the database.  This can generate errors,
169  *   which are returned in the errmsg
170  *
171  * DO NOT close the database or free(mdb) here  !!!!
172  */
173 int
174 db_open_database(JCR *jcr, B_DB *mdb)
175 {   
176    int errstat;
177    int dbstat;
178    const char *errmsg;
179    char buf[10], *port;
180    int numdrivers; 
181    
182    P(mutex);
183    if (mdb->connected) {
184       V(mutex);
185       return 1;
186    }
187    mdb->connected = false;
188
189    if ((errstat=rwl_init(&mdb->lock)) != 0) {
190       berrno be;
191       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
192             be.bstrerror(errstat));
193       V(mutex);
194       return 0;
195    }
196
197    if (mdb->db_port) {
198       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
199       port = buf;
200    } else {
201       port = NULL;
202    }
203    
204    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
205    if (numdrivers < 0) {
206       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
207                                "db_driverdir=%s. It is probaly not found any drivers\n"),
208                                mdb->db_driverdir,numdrivers);
209       V(mutex);
210       return 0;
211    }
212    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
213    dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
214    dbi_conn_set_option(mdb->db, "port", port);            /* default port */
215    dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
216    dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
217    dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
218       
219    /* If connection fails, try at 5 sec intervals for 30 seconds. */
220    for (int retry=0; retry < 6; retry++) {
221          
222       dbstat = dbi_conn_connect(mdb->db); 
223       if ( dbstat == 0) {
224          break;
225       }
226          
227       dbi_conn_error(mdb->db, &errmsg);
228       Dmsg1(50, "dbi error: %s\n", errmsg);
229                    
230       bmicrosleep(5, 0);
231          
232    }
233    
234    if ( dbstat != 0 ) {
235       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface.\n"
236                        "Type=%s Database=%s User=%s\n"
237                        "It is probably not running or your password is incorrect.\n"),
238                         mdb->db_driver, mdb->db_name, mdb->db_user);
239       V(mutex);
240       return 0;           
241    } 
242    
243    Dmsg0(50, "dbi_real_connect done\n");
244    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
245                     mdb->db_user, mdb->db_name,
246                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
247
248    mdb->connected = true;
249
250    if (!check_tables_version(jcr, mdb)) {
251       V(mutex);
252       return 0;
253    }
254    
255    switch (mdb->db_type) {
256    case SQL_TYPE_MYSQL:
257       /* Set connection timeout to 8 days specialy for batch mode */
258       sql_query(mdb, "SET wait_timeout=691200");
259       sql_query(mdb, "SET interactive_timeout=691200");
260       break;
261    case SQL_TYPE_POSTGRESQL:
262       /* tell PostgreSQL we are using standard conforming strings
263          and avoid warnings such as:
264          WARNING:  nonstandard use of \\ in a string literal
265       */
266       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
267       sql_query(mdb, "set standard_conforming_strings=on");
268       break;
269    case SQL_TYPE_SQLITE:
270       break;
271    }
272    
273    V(mutex);
274    return 1;
275 }
276
277 void
278 db_close_database(JCR *jcr, B_DB *mdb)
279 {
280    if (!mdb) {
281       return;
282    }
283    db_end_transaction(jcr, mdb);
284    P(mutex);
285    sql_free_result(mdb);
286    mdb->ref_count--;
287    if (mdb->ref_count == 0) {
288       qdchain(&mdb->bq);
289       if (mdb->connected && mdb->db) {
290          //sql_close(mdb);
291          dbi_shutdown_r(mdb->instance);
292          mdb->db = NULL;
293          mdb->instance = NULL;
294       }
295       rwl_destroy(&mdb->lock);
296       free_pool_memory(mdb->errmsg);
297       free_pool_memory(mdb->cmd);
298       free_pool_memory(mdb->cached_path);
299       free_pool_memory(mdb->fname);
300       free_pool_memory(mdb->path);
301       free_pool_memory(mdb->esc_name);
302       free_pool_memory(mdb->esc_path);
303       if (mdb->db_name) {
304          free(mdb->db_name);
305       }
306       if (mdb->db_user) {
307          free(mdb->db_user);
308       }
309       if (mdb->db_password) {
310          free(mdb->db_password);
311       }
312       if (mdb->db_address) {
313          free(mdb->db_address);
314       }
315       if (mdb->db_socket) {
316          free(mdb->db_socket);
317       }
318       if (mdb->db_driverdir) {
319          free(mdb->db_driverdir);
320       }
321       if (mdb->db_driver) {
322           free(mdb->db_driver);
323       }
324       free(mdb);            
325    }   
326    V(mutex);
327 }
328
329 void db_thread_cleanup()
330 { }
331
332 /*
333  * Return the next unique index (auto-increment) for
334  * the given table.  Return NULL on error.
335  *
336  */
337 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
338 {
339    strcpy(index, "NULL");
340    return 1;
341 }
342
343
344 /*
345  * Escape strings so that DBI is happy
346  *
347  *   NOTE! len is the length of the old string. Your new
348  *         string must be long enough (max 2*old+1) to hold
349  *         the escaped output.
350  * 
351  * dbi_conn_quote_string_copy receives a pointer to pointer.
352  * We need copy the value of pointer to snew because libdbi change the 
353  * pointer
354  */
355 void
356 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
357 {
358    char *inew;
359    char *pnew;
360    
361    if (len == 0) {
362       snew[0] = 0; 
363    } else {
364       /* correct the size of old basead in len
365        * and copy new string to inew
366        */
367       inew = (char *)malloc(sizeof(char) * len + 1);
368       bstrncpy(inew,old,len + 1);
369       /* escape the correct size of old */
370       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
371       free(inew);
372       /* copy the escaped string to snew */
373       bstrncpy(snew, pnew, 2 * len + 1);   
374    }
375    
376    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
377    
378 }
379
380 /*
381  * Submit a general SQL command (cmd), and for each row returned,
382  *  the sqlite_handler is called with the ctx.
383  */
384 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
385 {
386    SQL_ROW row;
387
388    Dmsg0(500, "db_sql_query started\n");
389
390    db_lock(mdb);
391    if (sql_query(mdb, query) != 0) {
392       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
393       db_unlock(mdb);
394       Dmsg0(500, "db_sql_query failed\n");
395       return false;
396    }
397    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
398
399    if (result_handler != NULL) {
400       Dmsg0(500, "db_sql_query invoking handler\n");
401       if ((mdb->result = sql_store_result(mdb)) != NULL) {
402          int num_fields = sql_num_fields(mdb);
403
404          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
405          while ((row = sql_fetch_row(mdb)) != NULL) {
406
407             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
408             if (result_handler(ctx, num_fields, row))
409                break;
410          }
411
412         sql_free_result(mdb);
413       }
414    }
415    db_unlock(mdb);
416
417    Dmsg0(500, "db_sql_query finished\n");
418
419    return true;
420 }
421
422
423
424 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
425 {
426    int j;
427    DBI_ROW row = NULL; // by default, return NULL
428
429    Dmsg0(500, "my_dbi_fetch_row start\n");
430
431    if (!mdb->row || mdb->row_size < mdb->num_fields) {
432       int num_fields = mdb->num_fields;
433       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
434
435       if (mdb->row) {
436          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
437          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
438          if (mdb->num_rows != 0) {
439             for(j = 0; j < mdb->num_fields; j++) {
440                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);
441                   if(mdb->row[j]) {
442                      free(mdb->row[j]);
443                   }                  
444             } 
445          }
446          free(mdb->row);
447       }
448       num_fields += 20;                  /* add a bit extra */
449       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
450       mdb->row_size = num_fields;
451
452       // now reset the row_number now that we have the space allocated
453       mdb->row_number = 1;
454    }
455
456    // if still within the result set
457    if (mdb->row_number <= mdb->num_rows) {
458       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
459       // get each value from this row
460       for (j = 0; j < mdb->num_fields; j++) {
461          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
462          Dmsg3(500, "my_dbi_fetch_row field '%p' '%d' has value '%s'\n",mdb->row[j], j, mdb->row[j]);
463       }
464       // increment the row number for the next call
465       mdb->row_number++;
466
467       row = mdb->row;
468    } else {
469       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
470    }
471
472    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
473
474    return row;
475 }
476
477 int my_dbi_max_length(B_DB *mdb, int field_num) {
478    //
479    // for a given column, find the max length
480    //
481    int max_length;
482    int i;
483    int this_length;
484    char *cbuf = NULL;
485
486    max_length = 0;
487    for (i = 0; i < mdb->num_rows; i++) {
488       if (my_dbi_getisnull(mdb->result, i, field_num)) {
489           this_length = 4;        // "NULL"
490       } else {
491           // TODO: error
492          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
493          this_length = cstrlen(cbuf);
494          free(cbuf);
495       }
496
497       if (max_length < this_length) {
498           max_length = this_length;
499       }
500    }
501
502    return max_length;
503 }
504
505 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
506 {
507    int     i;
508    int     dbi_index;
509
510    Dmsg0(500, "my_dbi_fetch_field starts\n");
511
512    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
513       if (mdb->fields) {
514          free(mdb->fields);
515       }
516       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
517       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
518       mdb->fields_size = mdb->num_fields;
519
520       for (i = 0; i < mdb->num_fields; i++) {
521          dbi_index = i + 1;
522          Dmsg1(500, "filling field %d\n", i);
523          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
524          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
525          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
526          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
527
528          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
529             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
530             mdb->fields[i].flags);
531       } // end for
532    } // end if
533
534    // increment field number for the next time around
535
536    Dmsg0(500, "my_dbi_fetch_field finishes\n");
537    return &mdb->fields[mdb->field_number++];
538 }
539
540 void my_dbi_data_seek(B_DB *mdb, int row)
541 {
542    // set the row number to be returned on the next call
543    // to my_dbi_fetch_row
544    mdb->row_number = row;
545 }
546
547 void my_dbi_field_seek(B_DB *mdb, int field)
548 {
549    mdb->field_number = field;
550 }
551
552 /*
553  * Note, if this routine returns 1 (failure), Bacula expects
554  *  that no result has been stored.
555  *
556  *  Returns:  0  on success
557  *            1  on failure
558  *
559  */
560 int my_dbi_query(B_DB *mdb, const char *query)
561 {
562    const char *errmsg;
563    Dmsg1(500, "my_dbi_query started %s\n", query);
564    // We are starting a new query.  reset everything.
565    mdb->num_rows     = -1;
566    mdb->row_number   = -1;
567    mdb->field_number = -1;
568
569    if (mdb->result) {
570       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
571       mdb->result = NULL;
572    }
573          
574    mdb->result = (void **)dbi_conn_query(mdb->db, query);
575       
576    if (!mdb->result) {
577       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);      
578       goto bail_out;
579    }
580    
581    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
582    
583    if (mdb->status == DBI_ERROR_NONE) {
584       Dmsg1(500, "we have a result\n", query);
585
586       // how many fields in the set?
587       mdb->num_fields = dbi_result_get_numfields(mdb->result);
588       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
589
590       mdb->num_rows = dbi_result_get_numrows(mdb->result);
591       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
592
593       mdb->status = (dbi_error_flag) 0;                  /* succeed */
594    } else {
595       Dmsg1(50, "Result status failed: %s\n", query);
596       goto bail_out;
597    }
598
599    Dmsg0(500, "my_dbi_query finishing\n");
600    return mdb->status;
601
602 bail_out:
603    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
604    //dbi_conn_error(mdb->db, &errmsg);
605    Dmsg4(500, "my_dbi_query we failed dbi error: "
606                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
607    dbi_result_free(mdb->result);
608    mdb->result = NULL;
609    mdb->status = (dbi_error_flag) 1;                   /* failed */
610    return mdb->status;
611 }
612
613 void my_dbi_free_result(B_DB *mdb)
614
615    
616    db_lock(mdb);  
617    int i = 0;
618    if (mdb->result) {
619       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
620       dbi_result_free(mdb->result);
621    }
622
623    mdb->result = NULL;
624    
625    if (mdb->row) {
626       Dmsg2(500, "my_dbi_free_result row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
627       if (mdb->num_rows != 0) {
628          for(i = 0; i < mdb->num_fields; i++) {
629             Dmsg2(500, "my_dbi_free_result row '%p' '%d'\n", mdb->row[i], i);
630             if(mdb->row[i]) {
631                free(mdb->row[i]);
632             }
633          } 
634       }
635       free(mdb->row);
636       mdb->row = NULL;
637    }
638
639    if (mdb->fields) {
640       free(mdb->fields);
641       mdb->fields = NULL;
642    }
643    db_unlock(mdb);
644    //Dmsg0(500, "my_dbi_free_result finish\n");
645    
646 }
647
648 const char *my_dbi_strerror(B_DB *mdb) 
649 {
650    const char *errmsg;
651    
652    dbi_conn_error(mdb->db, &errmsg);
653         
654    return errmsg;
655 }
656
657 #ifdef HAVE_BATCH_FILE_INSERT
658
659 /*
660  * This can be a bit strang but is the one way to do
661  * 
662  * Returns 1 if OK
663  *         0 if failed
664  */
665 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
666 {
667    char *query = "COPY batch FROM STDIN";
668    
669    Dmsg0(500, "my_dbi_batch_start started\n");
670    
671    switch (mdb->db_type) {
672    case SQL_TYPE_MYSQL:
673       db_lock(mdb);
674       if (my_dbi_query(mdb,
675                               "CREATE TEMPORARY TABLE batch ("
676                                   "FileIndex integer,"
677                                   "JobId integer,"
678                                   "Path blob,"
679                                   "Name blob,"
680                                   "LStat tinyblob,"
681                                   "MD5 tinyblob)") == 1)
682       {
683          Dmsg0(500, "my_dbi_batch_start failed\n");
684          return 1;
685       }
686       db_unlock(mdb);
687       Dmsg0(500, "my_dbi_batch_start finishing\n");
688       return 1;
689       break;
690    case SQL_TYPE_POSTGRESQL:
691       
692       //query = "COPY batch FROM STDIN";
693
694       if (my_dbi_query(mdb,
695                               "CREATE TEMPORARY TABLE batch ("
696                                   "fileindex int,"
697                                   "jobid int,"
698                                   "path varchar,"
699                                   "name varchar,"
700                                   "lstat varchar,"
701                                   "md5 varchar)") == 1)
702       {
703          Dmsg0(500, "my_dbi_batch_start failed\n");
704          return 1;
705       }
706       
707       // We are starting a new query.  reset everything.
708       mdb->num_rows     = -1;
709       mdb->row_number   = -1;
710       mdb->field_number = -1;
711
712       my_dbi_free_result(mdb);
713
714       for (int i=0; i < 10; i++) {
715          my_dbi_query(mdb, query);
716          if (mdb->result) {
717             break;
718          }
719          bmicrosleep(5, 0);
720       }
721       if (!mdb->result) {
722          Dmsg1(50, "Query failed: %s\n", query);
723          goto bail_out;
724       }
725
726       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
727       //mdb->status = DBI_ERROR_NONE;
728          
729       if (mdb->status == DBI_ERROR_NONE) {
730          // how many fields in the set?
731          mdb->num_fields = dbi_result_get_numfields(mdb->result);
732          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
733          mdb->status = (dbi_error_flag) 1;
734       } else {
735          Dmsg1(50, "Result status failed: %s\n", query);
736          goto bail_out;
737       }
738
739       Dmsg0(500, "my_postgresql_batch_start finishing\n");
740
741       return mdb->status;
742       break;
743    case SQL_TYPE_SQLITE:
744       db_lock(mdb);
745       if (my_dbi_query(mdb,
746                               "CREATE TEMPORARY TABLE batch ("
747                                   "FileIndex integer,"
748                                   "JobId integer,"
749                                   "Path blob,"
750                                   "Name blob,"
751                                   "LStat tinyblob,"
752                                   "MD5 tinyblob)") == 1)
753       {
754          Dmsg0(500, "my_dbi_batch_start failed\n");
755          goto bail_out;
756       }
757       db_unlock(mdb);
758       Dmsg0(500, "my_dbi_batch_start finishing\n");
759       return 1;
760       break;
761    }
762    
763 bail_out:
764    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
765    mdb->status = (dbi_error_flag) 0;
766    my_dbi_free_result(mdb);
767    mdb->result = NULL;
768    return mdb->status;
769 }
770
771 /* set error to something to abort operation */
772 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
773 {
774    int res = 0;
775    int count = 30;
776    int (*custom_function)(void*, const char*) = NULL;  
777    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
778    
779    Dmsg0(500, "my_dbi_batch_end started\n");
780
781    if (!mdb) {                  /* no files ? */
782       return 0;
783    }
784
785    switch (mdb->db_type) {
786    case SQL_TYPE_MYSQL:
787       if(mdb) {
788          mdb->status = (dbi_error_flag) 0;
789       }
790       break;
791    case SQL_TYPE_POSTGRESQL:
792       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
793
794                   
795       do { 
796          res = (*custom_function)(myconn->connection, error);         
797       } while (res == 0 && --count > 0);
798
799       if (res == 1) {
800          Dmsg0(500, "ok\n");
801          mdb->status = (dbi_error_flag) 1;
802       }
803          
804       if (res <= 0) {
805          Dmsg0(500, "we failed\n");
806          mdb->status = (dbi_error_flag) 0;
807          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
808        }
809       break;
810    case SQL_TYPE_SQLITE:
811       if(mdb) {
812          mdb->status = (dbi_error_flag) 0;
813       }
814       break;
815    }
816
817    Dmsg0(500, "my_dbi_batch_end finishing\n");
818
819    return true;      
820 }
821
822 /* 
823  * This function is big and use a big switch.  
824  * In near future is better split in small functions
825  * and refactory.
826  * 
827  */
828 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
829 {
830    int res;
831    int count=30;   
832    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
833    int (*custom_function)(void*, const char*, int) = NULL;
834    char* (*custom_function_error)(void*) = NULL;
835    size_t len;
836    char *digest;
837    char ed1[50];
838
839    Dmsg0(500, "my_dbi_batch_insert started \n");
840
841    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);    
842    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
843
844    if (ar->Digest == NULL || ar->Digest[0] == 0) {
845       digest = "0";
846    } else {
847       digest = ar->Digest;
848    }
849
850    switch (mdb->db_type) {
851    case SQL_TYPE_MYSQL:
852       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
853       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
854       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
855                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
856                       mdb->esc_name, ar->attr, digest);
857       
858       if (my_dbi_query(mdb,mdb->cmd) == 1)
859       {
860          Dmsg0(500, "my_dbi_batch_insert failed\n");
861          goto bail_out;
862       }
863       
864       Dmsg0(500, "my_dbi_batch_insert finishing\n");
865       
866       return 1;
867       break;
868    case SQL_TYPE_POSTGRESQL:
869       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
870       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
871       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
872                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
873                      mdb->esc_name, ar->attr, digest);
874       
875       /* libdbi don't support CopyData and we need call a postgresql
876        * specific function to do this work
877        */
878       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
879       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
880             "PQputCopyData")) != NULL) {
881          do { 
882             res = (*custom_function)(myconn->connection, mdb->cmd, len);
883          } while (res == 0 && --count > 0);
884
885          if (res == 1) {
886             Dmsg0(500, "ok\n");
887             mdb->changes++;
888             mdb->status = (dbi_error_flag) 1;
889          }
890
891          if (res <= 0) {
892             Dmsg0(500, "my_dbi_batch_insert failed\n");
893             goto bail_out;
894          }
895
896          Dmsg0(500, "my_dbi_batch_insert finishing\n");
897          return mdb->status;
898       } else {
899          // ensure to detect a PQerror 
900          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
901          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
902          goto bail_out;
903       }
904       break;
905    case SQL_TYPE_SQLITE:
906       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
907       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
908       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
909                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
910                       mdb->esc_name, ar->attr, digest);
911       if (my_dbi_query(mdb,mdb->cmd) == 1)
912       {
913          Dmsg0(500, "my_dbi_batch_insert failed\n");
914          goto bail_out;
915       }
916       
917       Dmsg0(500, "my_dbi_batch_insert finishing\n");
918       
919       return 1;
920       break;
921    }
922     
923 bail_out:
924   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
925   mdb->status = (dbi_error_flag) 0;
926   my_dbi_free_result(mdb);  
927   return mdb->status;
928 }
929
930 /*
931  * Escape strings so that PostgreSQL is happy on COPY
932  *
933  *   NOTE! len is the length of the old string. Your new
934  *         string must be long enough (max 2*old+1) to hold
935  *         the escaped output.
936  */
937 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
938 {
939    /* we have to escape \t, \n, \r, \ */
940    char c = '\0' ;
941
942    while (len > 0 && *src) {
943       switch (*src) {
944       case '\n':
945          c = 'n';
946          break;
947       case '\\':
948          c = '\\';
949          break;
950       case '\t':
951          c = 't';
952          break;
953       case '\r':
954          c = 'r';
955          break;
956       default:
957          c = '\0' ;
958       }
959
960       if (c) {
961          *dest = '\\';
962          dest++;
963          *dest = c;
964       } else {
965          *dest = *src;
966       }
967
968       len--;
969       src++;
970       dest++;
971    }
972
973    *dest = '\0';
974    return dest;
975 }
976
977 #endif /* HAVE_BATCH_FILE_INSERT */
978
979 /* my_dbi_getisnull
980  * like PQgetisnull
981  * int PQgetisnull(const PGresult *res,
982  *              int row_number,
983  *               int column_number);
984  * 
985  *  use dbi_result_seek_row to search in result set
986  */
987 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
988    int i;
989
990    if(row_number == 0) {
991       row_number++;
992    }
993    
994    column_number++;
995    
996    if(dbi_result_seek_row(result, row_number)) {
997
998       i = dbi_result_field_is_null_idx(result,column_number);
999
1000       return i;
1001    } else {
1002            
1003       return 0;
1004    }
1005                 
1006 }
1007 /* my_dbi_getvalue 
1008  * like PQgetvalue;
1009  * char *PQgetvalue(const PGresult *res,
1010  *                int row_number,
1011  *                int column_number);
1012  *
1013  * use dbi_result_seek_row to search in result set
1014  * use example to return only strings
1015  */
1016 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1017
1018    /* TODO: This is very bad, need refactoring */
1019    //POOLMEM *buf = get_pool_memory(PM_FNAME);
1020    char *buf = NULL;
1021    const char *errmsg;
1022    const char *field_name;     
1023    unsigned short dbitype;
1024    int32_t field_length = 0;
1025    int64_t num;
1026         
1027    /* correct the index for dbi interface
1028     * dbi index begins 1
1029     * I prefer do not change others functions
1030     */
1031    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n", 
1032                                 result, row_number, column_number);
1033         
1034    column_number++;
1035
1036    if(row_number == 0) {
1037      row_number++;
1038    }
1039       
1040    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n", 
1041                         result, row_number, column_number);
1042    
1043    if(dbi_result_seek_row(result, row_number)) {
1044
1045       field_name = dbi_result_get_field_name(result, column_number);
1046       field_length = dbi_result_get_field_length(result, field_name);
1047       dbitype = dbi_result_get_field_type_idx(result,column_number);
1048       
1049       if(field_length) {
1050          //buf = check_pool_memory_size(buf, field_length + 1);
1051          buf = (char *)malloc(sizeof(char *) * field_length + 1);
1052       } else {
1053          /* if numbers */
1054          //buf = check_pool_memory_size(buf, 50);
1055          buf = (char *)malloc(sizeof(char *) * 50);
1056       }
1057       
1058       Dmsg4(500, "my_dbi_getvalue result '%p' type '%d' \n\tfield name '%s'\n\t"
1059             "field_length '%d'\n", 
1060             result, dbitype, field_name, field_length);
1061       
1062       switch (dbitype) {
1063       case DBI_TYPE_INTEGER:
1064          num = dbi_result_get_longlong(result, field_name);         
1065          edit_int64(num, buf);
1066          field_length = strlen(buf);
1067          break;
1068       case DBI_TYPE_STRING:
1069          if(field_length) {
1070             field_length = bsnprintf(buf, field_length + 1, "%s", 
1071             dbi_result_get_string(result, field_name));
1072          } else {
1073             buf[0] = 0;
1074          }
1075          break;
1076       case DBI_TYPE_BINARY:
1077          /* dbi_result_get_binary return a NULL pointer if value is empty
1078          * following, change this to what Bacula espected
1079          */
1080          if(field_length) {
1081             field_length = bsnprintf(buf, field_length + 1, "%s", 
1082                   dbi_result_get_binary(result, field_name));
1083          } else {
1084             buf[0] = 0;
1085          }
1086          break;
1087       case DBI_TYPE_DATETIME:
1088          time_t last;
1089          struct tm tm;
1090          
1091          last = dbi_result_get_datetime(result, field_name);
1092          
1093          if(last == -1) {
1094                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00"); 
1095          } else {
1096             (void)localtime_r(&last, &tm);
1097             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1098                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1099                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1100          }
1101          break;
1102       }
1103
1104    } else {
1105       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1106       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1107    }
1108                 
1109    Dmsg3(500, "my_dbi_getvalue finish result '%p' num bytes '%d' data '%s'\n", 
1110       result, field_length, buf);
1111    
1112    return buf;
1113 }
1114
1115 //int my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number, char *value) {
1116 //   
1117 //   void *v;
1118 //   const char *errmsg;
1119 //   int error = 0;
1120 //   const char *field_name;     
1121 //   unsigned short dbitype;
1122 //   int32_t field_length = 0;
1123 //   int64_t num;
1124 //
1125 //   /* correct the index for dbi interface
1126 //    * dbi index begins 1
1127 //    * I prefer do not change others functions
1128 //    */
1129 //   Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n", 
1130 //                                result, row_number, column_number);
1131 //        
1132 //   column_number++;
1133 //
1134 //   if(row_number == 0) {
1135 //     row_number++;
1136 //   }
1137 //      
1138 //   Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n", 
1139 //                        result, row_number, column_number);
1140 //   
1141 //   if(dbi_result_seek_row(result, row_number)) {
1142 //
1143 //      field_name = dbi_result_get_field_name(result, column_number);
1144 //      field_length = dbi_result_get_field_length(result, field_name);
1145 //      dbitype = dbi_result_get_field_type_idx(result,column_number);
1146 //
1147 //      Dmsg4(500, "my_dbi_getvalue result '%p' type '%d' \n\tfield name '%s'\n\t"
1148 //            "field_length '%d'\n", result, dbitype, field_name, field_length);
1149 //
1150 //      switch (dbitype) {
1151 //      case DBI_TYPE_INTEGER:
1152 //         v = (int64_t *)malloc(sizeof(int64_t));
1153 //         error = dbi_result_bind_longlong(result, field_name, (int64_t *)v);
1154 //         // transform in string
1155 //         num = *(int64_t *)v;
1156 //         edit_int64(num, value);
1157 //         field_length = strlen(value);
1158 //         break;
1159 //      case DBI_TYPE_STRING:
1160 //         if(field_length) {
1161 //            dbi_result_bind_string(result, field_name, (const char **)v);
1162 //            value = (char *) v;
1163 //         } else {
1164 //            value[0] = 0;
1165 //         }
1166 //         break;
1167 //      case DBI_TYPE_BINARY:
1168 //         if(field_length) {
1169 //            dbi_result_bind_binary(result, field_name, (const unsigned char **)v);
1170 //            value = (char *)v;
1171 //         } else {
1172 //            value[0] = 0;
1173 //         }
1174 //         break;
1175 //      case DBI_TYPE_DATETIME:
1176 //         //time_t last;
1177 //         struct tm tm;
1178 //
1179 //         v = (time_t *)dbi_result_get_datetime(result, field_name);
1180 //         dbi_result_bind_datetime(result, field_name, (time_t *)v);
1181 //         if(!v) {
1182 //                field_length = bsnprintf(value, 20, "0000-00-00 00:00:00"); 
1183 //         } else {
1184 //            (void)localtime_r((time_t *)v, &tm);
1185 //            field_length = bsnprintf(value, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1186 //                  (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1187 //                  tm.tm_hour, tm.tm_min, tm.tm_sec);
1188 //         }
1189 //         break;
1190 //      }
1191 //
1192 //   } else {
1193 //      dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1194 //      Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1195 //   }
1196 //                
1197 //   Dmsg2(500, "my_dbi_getvalue finish result '%p' num bytes '%d'\n",
1198 //      result, field_length);
1199 //   
1200 //   return 1;
1201 //}
1202
1203 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
1204 {
1205    /*
1206     Obtain the current value of the sequence that
1207     provides the serial value for primary key of the table.
1208
1209     currval is local to our session.  It is not affected by
1210     other transactions.
1211
1212     Determine the name of the sequence.
1213     PostgreSQL automatically creates a sequence using
1214     <table>_<column>_seq.
1215     At the time of writing, all tables used this format for
1216     for their primary key: <table>id
1217     Except for basefiles which has a primary key on baseid.
1218     Therefore, we need to special case that one table.
1219
1220     everything else can use the PostgreSQL formula.
1221    */
1222    
1223    char      sequence[30];  
1224    uint64_t    id = 0;
1225
1226    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1227            
1228       if (strcasecmp(table_name, "basefiles") == 0) {
1229          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1230       } else {
1231          bstrncpy(sequence, table_name, sizeof(sequence));
1232          bstrncat(sequence, "_",        sizeof(sequence));
1233          bstrncat(sequence, table_name, sizeof(sequence));
1234          bstrncat(sequence, "id",       sizeof(sequence));
1235       }
1236
1237       bstrncat(sequence, "_seq", sizeof(sequence));
1238       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1239    } else {
1240       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1241    }
1242    
1243    return id;
1244 }
1245
1246 #ifdef HAVE_BATCH_FILE_INSERT
1247 const char *my_dbi_batch_lock_path_query[3] = {
1248    /* Mysql */
1249    "LOCK TABLES Path write, batch write, Path as p write",
1250    /* Postgresql */
1251    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1252    /* SQLite */
1253    "BEGIN"};  
1254
1255 const char *my_dbi_batch_lock_filename_query[3] = {
1256    /* Mysql */
1257    "LOCK TABLES Filename write, batch write, Filename as f write",
1258    /* Postgresql */
1259    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1260    /* SQLite */
1261    "BEGIN"};
1262
1263 const char *my_dbi_batch_unlock_tables_query[3] = {
1264    /* Mysql */
1265    "UNLOCK TABLES",
1266    /* Postgresql */
1267    "COMMIT",
1268    /* SQLite */
1269    "COMMIT"};
1270
1271 const char *my_dbi_batch_fill_path_query[3] = {
1272    /* Mysql */
1273    "INSERT INTO Path (Path) "
1274    "SELECT a.Path FROM " 
1275    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1276    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1277    /* Postgresql */
1278    "INSERT INTO Path (Path) "
1279    "SELECT a.Path FROM "
1280    "(SELECT DISTINCT Path FROM batch) AS a "
1281    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1282    /* SQLite */
1283    "INSERT INTO Path (Path)" 
1284    " SELECT DISTINCT Path FROM batch"
1285    " EXCEPT SELECT Path FROM Path"};
1286
1287 const char *my_dbi_batch_fill_filename_query[3] = {
1288    /* Mysql */
1289    "INSERT INTO Filename (Name) "
1290    "SELECT a.Name FROM " 
1291    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1292    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1293    /* Postgresql */
1294    "INSERT INTO Filename (Name) "
1295    "SELECT a.Name FROM "
1296    "(SELECT DISTINCT Name FROM batch) as a "
1297    "WHERE NOT EXISTS "
1298    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1299    /* SQLite */
1300    "INSERT INTO Filename (Name)"
1301    " SELECT DISTINCT Name FROM batch "
1302    " EXCEPT SELECT Name FROM Filename"};
1303 #endif /* HAVE_BATCH_FILE_INSERT */
1304
1305 #endif /* HAVE_DBI */