]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
ebl Modify disk-changer to check if slot contains something before
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  *    Version $Id$
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_DBI
49
50 /* -----------------------------------------------------------------------
51  *
52  *   DBI dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /* List of open databases */
58 static BQUEUE db_list = {&db_list, &db_list};
59
60 /* Control allocated fields by my_dbi_getvalue */
61 static BQUEUE dbi_getvalue_list = {&dbi_getvalue_list, &dbi_getvalue_list};
62
63 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
64
65 /*
66  * Retrieve database type
67  */
68 const char *
69 db_get_type(void)
70 {
71    return "DBI";
72 }
73
74 /*
75  * Initialize database data structure. In principal this should
76  * never have errors, or it is really fatal.
77  */
78 B_DB *
79 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
80                  const char *db_address, int db_port, const char *db_socket, 
81                  int mult_db_connections)
82 {
83    B_DB *mdb;
84    char db_driver[10];
85    char db_driverdir[256];
86
87    /* Constraint the db_driver */
88    if(db_type  == -1) {
89       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
90       return NULL;
91    }
92    
93    /* Do the correct selection of driver. 
94     * Can be one of the varius supported by libdbi 
95     */
96    switch (db_type) {
97    case SQL_TYPE_MYSQL:
98       bstrncpy(db_driver,"mysql", sizeof(db_driver));
99       break;
100    case SQL_TYPE_POSTGRESQL:
101       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
102       break;
103    case SQL_TYPE_SQLITE:
104       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
105       break;
106    }
107    
108    /* Set db_driverdir whereis is the libdbi drivers */
109    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
110    
111    if (!db_user) {
112       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
113       return NULL;
114    }
115    P(mutex);                          /* lock DB queue */
116    if (!mult_db_connections) {
117       /* Look to see if DB already open */
118       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
119          if (bstrcmp(mdb->db_name, db_name) &&
120              bstrcmp(mdb->db_address, db_address) &&
121              bstrcmp(mdb->db_driver, db_driver) &&
122              mdb->db_port == db_port) { 
123             Dmsg4(100, "DB REopen %d %s %s erro: %d\n", mdb->ref_count, db_driver, db_name, 
124                   dbi_conn_error(mdb->db, NULL));
125             mdb->ref_count++;
126             V(mutex);
127             return mdb;                  /* already open */
128          }
129       }
130    }
131    Dmsg0(100, "db_open first time\n");
132    mdb = (B_DB *)malloc(sizeof(B_DB));
133    memset(mdb, 0, sizeof(B_DB));
134    mdb->db_name = bstrdup(db_name);
135    mdb->db_user = bstrdup(db_user);
136    if (db_password) {
137       mdb->db_password = bstrdup(db_password);
138    }
139    if (db_address) {
140       mdb->db_address  = bstrdup(db_address);
141    }
142    if (db_socket) {
143       mdb->db_socket   = bstrdup(db_socket);
144    }
145    if (db_driverdir) {
146       mdb->db_driverdir = bstrdup(db_driverdir);
147    }
148    if (db_driver) {
149       mdb->db_driver    = bstrdup(db_driver);
150    }
151    mdb->db_type        = db_type;
152    mdb->db_port        = db_port;
153    mdb->have_insert_id = TRUE;
154    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
155    *mdb->errmsg        = 0;
156    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
157    mdb->cached_path    = get_pool_memory(PM_FNAME);
158    mdb->cached_path_id = 0;
159    mdb->ref_count      = 1;
160    mdb->fname          = get_pool_memory(PM_FNAME);
161    mdb->path           = get_pool_memory(PM_FNAME);
162    mdb->esc_name       = get_pool_memory(PM_FNAME);
163    mdb->esc_path      = get_pool_memory(PM_FNAME);
164    mdb->allow_transactions = mult_db_connections;
165    qinsert(&db_list, &mdb->bq);            /* put db in list */
166    V(mutex);
167    return mdb;
168 }
169
170 /*
171  * Now actually open the database.  This can generate errors,
172  *   which are returned in the errmsg
173  *
174  * DO NOT close the database or free(mdb) here  !!!!
175  */
176 int
177 db_open_database(JCR *jcr, B_DB *mdb)
178 {   
179    int errstat;
180    int dbstat;
181    const char *errmsg;
182    char buf[10], *port;
183    int numdrivers; 
184    
185    P(mutex);
186    if (mdb->connected) {
187       V(mutex);
188       return 1;
189    }
190    mdb->connected = false;
191
192    if ((errstat=rwl_init(&mdb->lock)) != 0) {
193       berrno be;
194       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
195             be.bstrerror(errstat));
196       V(mutex);
197       return 0;
198    }
199
200    if (mdb->db_port) {
201       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
202       port = buf;
203    } else {
204       port = NULL;
205    }
206    
207    numdrivers = dbi_initialize_r(mdb->db_driverdir, &(mdb->instance));
208    if (numdrivers < 0) {
209       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
210                                "db_driverdir=%s. It is probaly not found any drivers\n"),
211                                mdb->db_driverdir,numdrivers);
212       V(mutex);
213       return 0;
214    }
215    mdb->db = (void **)dbi_conn_new_r(mdb->db_driver, mdb->instance);
216    dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
217    dbi_conn_set_option(mdb->db, "port", port);            /* default port */
218    dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
219    dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
220    dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
221       
222    /* If connection fails, try at 5 sec intervals for 30 seconds. */
223    for (int retry=0; retry < 6; retry++) {
224          
225       dbstat = dbi_conn_connect(mdb->db); 
226       if ( dbstat == 0) {
227          break;
228       }
229          
230       dbi_conn_error(mdb->db, &errmsg);
231       Dmsg1(50, "dbi error: %s\n", errmsg);
232                    
233       bmicrosleep(5, 0);
234          
235    }
236    
237    if ( dbstat != 0 ) {
238       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface.\n"
239                        "Type=%s Database=%s User=%s\n"
240                        "It is probably not running or your password is incorrect.\n"),
241                         mdb->db_driver, mdb->db_name, mdb->db_user);
242       V(mutex);
243       return 0;           
244    } 
245    
246    Dmsg0(50, "dbi_real_connect done\n");
247    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
248                     mdb->db_user, mdb->db_name,
249                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
250
251    mdb->connected = true;
252
253    if (!check_tables_version(jcr, mdb)) {
254       V(mutex);
255       return 0;
256    }
257    
258    switch (mdb->db_type) {
259    case SQL_TYPE_MYSQL:
260       /* Set connection timeout to 8 days specialy for batch mode */
261       sql_query(mdb, "SET wait_timeout=691200");
262       sql_query(mdb, "SET interactive_timeout=691200");
263       break;
264    case SQL_TYPE_POSTGRESQL:
265       /* tell PostgreSQL we are using standard conforming strings
266          and avoid warnings such as:
267          WARNING:  nonstandard use of \\ in a string literal
268       */
269       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
270       sql_query(mdb, "set standard_conforming_strings=on");
271       break;
272    case SQL_TYPE_SQLITE:
273       break;
274    }
275    
276    V(mutex);
277    return 1;
278 }
279
280 void
281 db_close_database(JCR *jcr, B_DB *mdb)
282 {
283    if (!mdb) {
284       return;
285    }
286    db_end_transaction(jcr, mdb);
287    P(mutex);
288    sql_free_result(mdb);
289    mdb->ref_count--;
290    if (mdb->ref_count == 0) {
291       qdchain(&mdb->bq);
292       if (mdb->connected && mdb->db) {
293          //sql_close(mdb);
294          dbi_shutdown_r(mdb->instance);
295          mdb->db = NULL;
296          mdb->instance = NULL;
297       }
298       rwl_destroy(&mdb->lock);
299       free_pool_memory(mdb->errmsg);
300       free_pool_memory(mdb->cmd);
301       free_pool_memory(mdb->cached_path);
302       free_pool_memory(mdb->fname);
303       free_pool_memory(mdb->path);
304       free_pool_memory(mdb->esc_name);
305       free_pool_memory(mdb->esc_path);
306       if (mdb->db_name) {
307          free(mdb->db_name);
308       }
309       if (mdb->db_user) {
310          free(mdb->db_user);
311       }
312       if (mdb->db_password) {
313          free(mdb->db_password);
314       }
315       if (mdb->db_address) {
316          free(mdb->db_address);
317       }
318       if (mdb->db_socket) {
319          free(mdb->db_socket);
320       }
321       if (mdb->db_driverdir) {
322          free(mdb->db_driverdir);
323       }
324       if (mdb->db_driver) {
325           free(mdb->db_driver);
326       }
327       free(mdb);            
328    }   
329    V(mutex);
330 }
331
332 void db_thread_cleanup()
333 { }
334
335 /*
336  * Return the next unique index (auto-increment) for
337  * the given table.  Return NULL on error.
338  *
339  */
340 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
341 {
342    strcpy(index, "NULL");
343    return 1;
344 }
345
346
347 /*
348  * Escape strings so that DBI is happy
349  *
350  *   NOTE! len is the length of the old string. Your new
351  *         string must be long enough (max 2*old+1) to hold
352  *         the escaped output.
353  * 
354  * dbi_conn_quote_string_copy receives a pointer to pointer.
355  * We need copy the value of pointer to snew because libdbi change the 
356  * pointer
357  */
358 void
359 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
360 {
361    char *inew;
362    char *pnew;
363    
364    if (len == 0) {
365       snew[0] = 0; 
366    } else {
367       /* correct the size of old basead in len
368        * and copy new string to inew
369        */
370       inew = (char *)malloc(sizeof(char) * len + 1);
371       bstrncpy(inew,old,len + 1);
372       /* escape the correct size of old */
373       dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
374       free(inew);
375       /* copy the escaped string to snew */
376       bstrncpy(snew, pnew, 2 * len + 1);   
377    }
378    
379    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
380    
381 }
382
383 /*
384  * Submit a general SQL command (cmd), and for each row returned,
385  *  the sqlite_handler is called with the ctx.
386  */
387 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
388 {
389    SQL_ROW row;
390
391    Dmsg0(500, "db_sql_query started\n");
392
393    db_lock(mdb);
394    if (sql_query(mdb, query) != 0) {
395       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
396       db_unlock(mdb);
397       Dmsg0(500, "db_sql_query failed\n");
398       return false;
399    }
400    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
401
402    if (result_handler != NULL) {
403       Dmsg0(500, "db_sql_query invoking handler\n");
404       if ((mdb->result = sql_store_result(mdb)) != NULL) {
405          int num_fields = sql_num_fields(mdb);
406
407          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
408          while ((row = sql_fetch_row(mdb)) != NULL) {
409
410             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
411             if (result_handler(ctx, num_fields, row))
412                break;
413          }
414
415         sql_free_result(mdb);
416       }
417    }
418    db_unlock(mdb);
419
420    Dmsg0(500, "db_sql_query finished\n");
421
422    return true;
423 }
424
425
426
427 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
428 {
429    int j;
430    DBI_ROW row = NULL; // by default, return NULL
431
432    Dmsg0(500, "my_dbi_fetch_row start\n");
433    
434
435    if (!mdb->row || mdb->row_size < mdb->num_fields) {   
436       int num_fields = mdb->num_fields;
437       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
438
439       if (mdb->row) {
440          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
441          Dmsg2(500, "my_dbi_free_row row: '%p' num_fields: '%d'\n", mdb->row, mdb->num_fields);
442          if (mdb->num_rows != 0) {
443             for(j = 0; j < mdb->num_fields; j++) {
444                Dmsg2(500, "my_dbi_free_row row '%p' '%d'\n", mdb->row[j], j);               
445                   if(mdb->row[j]) {
446                      free(mdb->row[j]);
447                   }                  
448             } 
449          }
450          free(mdb->row);
451       }
452       //num_fields += 20;                  /* add a bit extra */
453       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
454       mdb->row_size = num_fields;
455
456       // now reset the row_number now that we have the space allocated
457       mdb->row_number = 1;
458    }
459
460    // if still within the result set
461    if (mdb->row_number <= mdb->num_rows) {
462       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
463       // get each value from this row
464       for (j = 0; j < mdb->num_fields; j++) {
465          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);         
466          // allocate space to queue row 
467          mdb->field_get = (DBI_FIELD_GET *)malloc(sizeof(DBI_FIELD_GET));
468          // store the pointer in queue
469          mdb->field_get->value = mdb->row[j];  
470          Dmsg4(500, "my_dbi_fetch_row row[%d] field: '%p' in queue: '%p' has value: '%s'\n",
471                j, mdb->row[j], mdb->field_get->value, mdb->row[j]);
472          // insert in queue to future free
473          qinsert(&dbi_getvalue_list, &mdb->field_get->bq);         
474       }
475       // increment the row number for the next call
476       mdb->row_number++;
477
478       row = mdb->row;
479    } else {
480       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (1..%d)\n", mdb->row_number, mdb->num_rows);
481    }
482
483    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
484
485    return row;
486 }
487
488 int my_dbi_max_length(B_DB *mdb, int field_num) {
489    //
490    // for a given column, find the max length
491    //
492    int max_length;
493    int i;
494    int this_length;
495    char *cbuf = NULL;
496
497    max_length = 0;
498    for (i = 0; i < mdb->num_rows; i++) {
499       if (my_dbi_getisnull(mdb->result, i, field_num)) {
500           this_length = 4;        // "NULL"
501       } else {
502          cbuf = my_dbi_getvalue(mdb->result, i, field_num);
503          this_length = cstrlen(cbuf);
504          // cbuf is always free
505          free(cbuf);
506       }
507
508       if (max_length < this_length) {
509           max_length = this_length;
510       }
511    }
512
513    return max_length;
514 }
515
516 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
517 {
518    int     i;
519    int     dbi_index;
520
521    Dmsg0(500, "my_dbi_fetch_field starts\n");
522
523    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
524       if (mdb->fields) {
525          free(mdb->fields);
526       }
527       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
528       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
529       mdb->fields_size = mdb->num_fields;
530       
531       for (i = 0; i < mdb->num_fields; i++) {
532          // num_fileds is starting at 1, increment i by 1
533          dbi_index = i + 1;
534          Dmsg1(500, "filling field %d\n", i);
535          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
536          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
537          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
538          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
539
540          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
541             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
542             mdb->fields[i].flags);
543       } // end for
544    } // end if
545
546    // increment field number for the next time around
547
548    Dmsg0(500, "my_dbi_fetch_field finishes\n");
549    return &mdb->fields[mdb->field_number++];
550 }
551
552 void my_dbi_data_seek(B_DB *mdb, int row)
553 {
554    // set the row number to be returned on the next call
555    // to my_dbi_fetch_row
556    mdb->row_number = row;
557 }
558
559 void my_dbi_field_seek(B_DB *mdb, int field)
560 {
561    mdb->field_number = field;
562 }
563
564 /*
565  * Note, if this routine returns 1 (failure), Bacula expects
566  *  that no result has been stored.
567  *
568  *  Returns:  0  on success
569  *            1  on failure
570  *
571  */
572 int my_dbi_query(B_DB *mdb, const char *query)
573 {
574    const char *errmsg;
575    Dmsg1(500, "my_dbi_query started %s\n", query);
576    // We are starting a new query.  reset everything.
577    mdb->num_rows     = -1;
578    mdb->row_number   = -1;
579    mdb->field_number = -1;
580
581    if (mdb->result) {
582       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
583       mdb->result = NULL;
584    }
585          
586    mdb->result = (void **)dbi_conn_query(mdb->db, query);
587       
588    if (!mdb->result) {
589       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);      
590       goto bail_out;
591    }
592    
593    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db, &errmsg);
594    
595    if (mdb->status == DBI_ERROR_NONE) {
596       Dmsg1(500, "we have a result\n", query);
597
598       // how many fields in the set?
599       // num_fields starting at 1
600       mdb->num_fields = dbi_result_get_numfields(mdb->result);
601       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
602       // if no result num_rows is 0
603       mdb->num_rows = dbi_result_get_numrows(mdb->result);
604       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
605
606       mdb->status = (dbi_error_flag) 0;                  /* succeed */
607    } else {
608       Dmsg1(50, "Result status failed: %s\n", query);
609       goto bail_out;
610    }
611
612    Dmsg0(500, "my_dbi_query finishing\n");
613    return mdb->status;
614
615 bail_out:
616    mdb->status = (dbi_error_flag) dbi_conn_error(mdb->db,&errmsg);
617    //dbi_conn_error(mdb->db, &errmsg);
618    Dmsg4(500, "my_dbi_query we failed dbi error: "
619                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
620    dbi_result_free(mdb->result);
621    mdb->result = NULL;
622    mdb->status = (dbi_error_flag) 1;                   /* failed */
623    return mdb->status;
624 }
625
626 void my_dbi_free_result(B_DB *mdb)
627
628    
629    DBI_FIELD_GET *f;
630    db_lock(mdb);  
631    int i = 0;
632    if (mdb->result) {
633       Dmsg1(500, "my_dbi_free_result result '%p'\n", mdb->result);
634       dbi_result_free(mdb->result);
635    }
636
637    mdb->result = NULL;
638    
639    if (mdb->row) {
640       free(mdb->row);
641    }
642    
643    /* now is time to free all value return by my_dbi_get_value
644     * this is necessary because libdbi don't free memory return by yours results
645     * and Bacula has some routine wich call more than once time my_dbi_fetch_row
646     * 
647     * Using a queue to store all pointer allocate is a good way to free all things
648     * when necessary
649     */
650    while((f=(DBI_FIELD_GET *)qremove(&dbi_getvalue_list))) {
651       Dmsg2(500, "my_dbi_free_result field value: '%p' in queue: '%p'\n", f->value, f);
652       free(f->value);
653       free(f);
654    }
655       
656    mdb->row = NULL;
657    
658    if (mdb->fields) {
659       free(mdb->fields);
660       mdb->fields = NULL;
661    }
662    db_unlock(mdb);
663    Dmsg0(500, "my_dbi_free_result finish\n");
664    
665 }
666
667 const char *my_dbi_strerror(B_DB *mdb) 
668 {
669    const char *errmsg;
670    
671    dbi_conn_error(mdb->db, &errmsg);
672         
673    return errmsg;
674 }
675
676 #ifdef HAVE_BATCH_FILE_INSERT
677
678 /*
679  * This can be a bit strang but is the one way to do
680  * 
681  * Returns 1 if OK
682  *         0 if failed
683  */
684 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
685 {
686    char *query = "COPY batch FROM STDIN";
687    
688    Dmsg0(500, "my_dbi_batch_start started\n");
689    
690    switch (mdb->db_type) {
691    case SQL_TYPE_MYSQL:
692       db_lock(mdb);
693       if (my_dbi_query(mdb,
694                               "CREATE TEMPORARY TABLE batch ("
695                                   "FileIndex integer,"
696                                   "JobId integer,"
697                                   "Path blob,"
698                                   "Name blob,"
699                                   "LStat tinyblob,"
700                                   "MD5 tinyblob)") == 1)
701       {
702          Dmsg0(500, "my_dbi_batch_start failed\n");
703          return 1;
704       }
705       db_unlock(mdb);
706       Dmsg0(500, "my_dbi_batch_start finishing\n");
707       return 1;
708       break;
709    case SQL_TYPE_POSTGRESQL:
710       
711       //query = "COPY batch FROM STDIN";
712
713       if (my_dbi_query(mdb,
714                               "CREATE TEMPORARY TABLE batch ("
715                                   "fileindex int,"
716                                   "jobid int,"
717                                   "path varchar,"
718                                   "name varchar,"
719                                   "lstat varchar,"
720                                   "md5 varchar)") == 1)
721       {
722          Dmsg0(500, "my_dbi_batch_start failed\n");
723          return 1;
724       }
725       
726       // We are starting a new query.  reset everything.
727       mdb->num_rows     = -1;
728       mdb->row_number   = -1;
729       mdb->field_number = -1;
730
731       my_dbi_free_result(mdb);
732
733       for (int i=0; i < 10; i++) {
734          my_dbi_query(mdb, query);
735          if (mdb->result) {
736             break;
737          }
738          bmicrosleep(5, 0);
739       }
740       if (!mdb->result) {
741          Dmsg1(50, "Query failed: %s\n", query);
742          goto bail_out;
743       }
744
745       mdb->status = (dbi_error_flag)dbi_conn_error(mdb->db, NULL);
746       //mdb->status = DBI_ERROR_NONE;
747          
748       if (mdb->status == DBI_ERROR_NONE) {
749          // how many fields in the set?
750          mdb->num_fields = dbi_result_get_numfields(mdb->result);
751          mdb->num_rows   = dbi_result_get_numrows(mdb->result);
752          mdb->status = (dbi_error_flag) 1;
753       } else {
754          Dmsg1(50, "Result status failed: %s\n", query);
755          goto bail_out;
756       }
757
758       Dmsg0(500, "my_postgresql_batch_start finishing\n");
759
760       return mdb->status;
761       break;
762    case SQL_TYPE_SQLITE:
763       db_lock(mdb);
764       if (my_dbi_query(mdb,
765                               "CREATE TEMPORARY TABLE batch ("
766                                   "FileIndex integer,"
767                                   "JobId integer,"
768                                   "Path blob,"
769                                   "Name blob,"
770                                   "LStat tinyblob,"
771                                   "MD5 tinyblob)") == 1)
772       {
773          Dmsg0(500, "my_dbi_batch_start failed\n");
774          goto bail_out;
775       }
776       db_unlock(mdb);
777       Dmsg0(500, "my_dbi_batch_start finishing\n");
778       return 1;
779       break;
780    }
781    
782 bail_out:
783    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), my_dbi_strerror(mdb));
784    mdb->status = (dbi_error_flag) 0;
785    my_dbi_free_result(mdb);
786    mdb->result = NULL;
787    return mdb->status;
788 }
789
790 /* set error to something to abort operation */
791 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
792 {
793    int res = 0;
794    int count = 30;
795    int (*custom_function)(void*, const char*) = NULL;  
796    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
797    
798    Dmsg0(500, "my_dbi_batch_end started\n");
799
800    if (!mdb) {                  /* no files ? */
801       return 0;
802    }
803
804    switch (mdb->db_type) {
805    case SQL_TYPE_MYSQL:
806       if(mdb) {
807          mdb->status = (dbi_error_flag) 0;
808       }
809       break;
810    case SQL_TYPE_POSTGRESQL:
811       custom_function = (custom_function_end_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQputCopyEnd");
812
813                   
814       do { 
815          res = (*custom_function)(myconn->connection, error);         
816       } while (res == 0 && --count > 0);
817
818       if (res == 1) {
819          Dmsg0(500, "ok\n");
820          mdb->status = (dbi_error_flag) 1;
821       }
822          
823       if (res <= 0) {
824          Dmsg0(500, "we failed\n");
825          mdb->status = (dbi_error_flag) 0;
826          //Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
827        }
828       break;
829    case SQL_TYPE_SQLITE:
830       if(mdb) {
831          mdb->status = (dbi_error_flag) 0;
832       }
833       break;
834    }
835
836    Dmsg0(500, "my_dbi_batch_end finishing\n");
837
838    return true;      
839 }
840
841 /* 
842  * This function is big and use a big switch.  
843  * In near future is better split in small functions
844  * and refactory.
845  * 
846  */
847 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
848 {
849    int res;
850    int count=30;   
851    dbi_conn_t *myconn = (dbi_conn_t *)(mdb->db);
852    int (*custom_function)(void*, const char*, int) = NULL;
853    char* (*custom_function_error)(void*) = NULL;
854    size_t len;
855    char *digest;
856    char ed1[50];
857
858    Dmsg0(500, "my_dbi_batch_insert started \n");
859
860    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);    
861    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
862
863    if (ar->Digest == NULL || ar->Digest[0] == 0) {
864       digest = "0";
865    } else {
866       digest = ar->Digest;
867    }
868
869    switch (mdb->db_type) {
870    case SQL_TYPE_MYSQL:
871       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
872       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
873       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
874                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
875                       mdb->esc_name, ar->attr, digest);
876       
877       if (my_dbi_query(mdb,mdb->cmd) == 1)
878       {
879          Dmsg0(500, "my_dbi_batch_insert failed\n");
880          goto bail_out;
881       }
882       
883       Dmsg0(500, "my_dbi_batch_insert finishing\n");
884       
885       return 1;
886       break;
887    case SQL_TYPE_POSTGRESQL:
888       my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
889       my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
890       len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
891                      ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
892                      mdb->esc_name, ar->attr, digest);
893       
894       /* libdbi don't support CopyData and we need call a postgresql
895        * specific function to do this work
896        */
897       Dmsg2(500, "my_dbi_batch_insert :\n %s \ncmd_size: %d",mdb->cmd, len);
898       if ((custom_function = (custom_function_insert_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db),
899             "PQputCopyData")) != NULL) {
900          do { 
901             res = (*custom_function)(myconn->connection, mdb->cmd, len);
902          } while (res == 0 && --count > 0);
903
904          if (res == 1) {
905             Dmsg0(500, "ok\n");
906             mdb->changes++;
907             mdb->status = (dbi_error_flag) 1;
908          }
909
910          if (res <= 0) {
911             Dmsg0(500, "my_dbi_batch_insert failed\n");
912             goto bail_out;
913          }
914
915          Dmsg0(500, "my_dbi_batch_insert finishing\n");
916          return mdb->status;
917       } else {
918          // ensure to detect a PQerror 
919          custom_function_error = (custom_function_error_t)dbi_driver_specific_function(dbi_conn_get_driver(mdb->db), "PQerrorMessage");
920          Dmsg1(500, "my_dbi_batch_insert failed\n PQerrorMessage: %s", (*custom_function_error)(myconn->connection));
921          goto bail_out;
922       }
923       break;
924    case SQL_TYPE_SQLITE:
925       db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
926       db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
927       len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
928                       ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path, 
929                       mdb->esc_name, ar->attr, digest);
930       if (my_dbi_query(mdb,mdb->cmd) == 1)
931       {
932          Dmsg0(500, "my_dbi_batch_insert failed\n");
933          goto bail_out;
934       }
935       
936       Dmsg0(500, "my_dbi_batch_insert finishing\n");
937       
938       return 1;
939       break;
940    }
941     
942 bail_out:
943   Mmsg1(&mdb->errmsg, _("error inserting batch mode: %s"), my_dbi_strerror(mdb));
944   mdb->status = (dbi_error_flag) 0;
945   my_dbi_free_result(mdb);  
946   return mdb->status;
947 }
948
949 /*
950  * Escape strings so that PostgreSQL is happy on COPY
951  *
952  *   NOTE! len is the length of the old string. Your new
953  *         string must be long enough (max 2*old+1) to hold
954  *         the escaped output.
955  */
956 char *my_postgresql_copy_escape(char *dest, char *src, size_t len)
957 {
958    /* we have to escape \t, \n, \r, \ */
959    char c = '\0' ;
960
961    while (len > 0 && *src) {
962       switch (*src) {
963       case '\n':
964          c = 'n';
965          break;
966       case '\\':
967          c = '\\';
968          break;
969       case '\t':
970          c = 't';
971          break;
972       case '\r':
973          c = 'r';
974          break;
975       default:
976          c = '\0' ;
977       }
978
979       if (c) {
980          *dest = '\\';
981          dest++;
982          *dest = c;
983       } else {
984          *dest = *src;
985       }
986
987       len--;
988       src++;
989       dest++;
990    }
991
992    *dest = '\0';
993    return dest;
994 }
995
996 #endif /* HAVE_BATCH_FILE_INSERT */
997
998 /* my_dbi_getisnull
999  * like PQgetisnull
1000  * int PQgetisnull(const PGresult *res,
1001  *              int row_number,
1002  *               int column_number);
1003  * 
1004  *  use dbi_result_seek_row to search in result set
1005  */
1006 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
1007    int i;
1008
1009    if(row_number == 0) {
1010       row_number++;
1011    }
1012    
1013    column_number++;
1014    
1015    if(dbi_result_seek_row(result, row_number)) {
1016
1017       i = dbi_result_field_is_null_idx(result,column_number);
1018
1019       return i;
1020    } else {
1021            
1022       return 0;
1023    }
1024                 
1025 }
1026 /* my_dbi_getvalue 
1027  * like PQgetvalue;
1028  * char *PQgetvalue(const PGresult *res,
1029  *                int row_number,
1030  *                int column_number);
1031  *
1032  * use dbi_result_seek_row to search in result set
1033  * use example to return only strings
1034  */
1035 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
1036
1037    char *buf = NULL;
1038    const char *errmsg;
1039    const char *field_name;     
1040    unsigned short dbitype;
1041    size_t field_length;
1042    int64_t num;
1043         
1044    /* correct the index for dbi interface
1045     * dbi index begins 1
1046     * I prefer do not change others functions
1047     */
1048    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n", 
1049                                 result, row_number, column_number);
1050         
1051    column_number++;
1052
1053    if(row_number == 0) {
1054      row_number++;
1055    }
1056       
1057    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n", 
1058                         result, row_number, column_number);
1059    
1060    if(dbi_result_seek_row(result, row_number)) {
1061
1062       field_name = dbi_result_get_field_name(result, column_number);
1063       field_length = dbi_result_get_field_length(result, field_name);
1064       dbitype = dbi_result_get_field_type_idx(result,column_number);
1065       
1066       Dmsg3(500, "my_dbi_getvalue start: type: '%d' "
1067             "field_length bytes: '%d' fieldname: '%s'\n", 
1068             dbitype, field_length, field_name);
1069       
1070       if(field_length) {
1071          //buf = (char *)malloc(sizeof(char *) * field_length + 1);
1072          buf = (char *)malloc(field_length + 1);
1073       } else {
1074          /* if numbers */
1075          buf = (char *)malloc(sizeof(char *) * 50);
1076       }           
1077       
1078       switch (dbitype) {
1079       case DBI_TYPE_INTEGER:
1080          num = dbi_result_get_longlong(result, field_name);         
1081          edit_int64(num, buf);
1082          field_length = strlen(buf);
1083          break;
1084       case DBI_TYPE_STRING:
1085          if(field_length) {
1086             field_length = bsnprintf(buf, field_length + 1, "%s", 
1087             dbi_result_get_string(result, field_name));
1088          } else {
1089             buf[0] = 0;
1090          }
1091          break;
1092       case DBI_TYPE_BINARY:
1093          /* dbi_result_get_binary return a NULL pointer if value is empty
1094          * following, change this to what Bacula espected
1095          */
1096          if(field_length) {
1097             field_length = bsnprintf(buf, field_length + 1, "%s", 
1098                   dbi_result_get_binary(result, field_name));
1099          } else {
1100             buf[0] = 0;
1101          }
1102          break;
1103       case DBI_TYPE_DATETIME:
1104          time_t last;
1105          struct tm tm;
1106          
1107          last = dbi_result_get_datetime(result, field_name);
1108          
1109          if(last == -1) {
1110                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00"); 
1111          } else {
1112             (void)localtime_r(&last, &tm);
1113             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
1114                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
1115                   tm.tm_hour, tm.tm_min, tm.tm_sec);
1116          }
1117          break;
1118       }
1119
1120    } else {
1121       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
1122       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
1123    }
1124                 
1125    Dmsg3(500, "my_dbi_getvalue finish buffer: '%p' num bytes: '%d' data: '%s'\n", 
1126       buf, field_length, buf);
1127       
1128    // don't worry about this buf
1129    return buf;
1130 }
1131
1132 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
1133 {
1134    /*
1135     Obtain the current value of the sequence that
1136     provides the serial value for primary key of the table.
1137
1138     currval is local to our session.  It is not affected by
1139     other transactions.
1140
1141     Determine the name of the sequence.
1142     PostgreSQL automatically creates a sequence using
1143     <table>_<column>_seq.
1144     At the time of writing, all tables used this format for
1145     for their primary key: <table>id
1146     Except for basefiles which has a primary key on baseid.
1147     Therefore, we need to special case that one table.
1148
1149     everything else can use the PostgreSQL formula.
1150    */
1151    
1152    char      sequence[30];  
1153    uint64_t    id = 0;
1154
1155    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
1156            
1157       if (strcasecmp(table_name, "basefiles") == 0) {
1158          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
1159       } else {
1160          bstrncpy(sequence, table_name, sizeof(sequence));
1161          bstrncat(sequence, "_",        sizeof(sequence));
1162          bstrncat(sequence, table_name, sizeof(sequence));
1163          bstrncat(sequence, "id",       sizeof(sequence));
1164       }
1165
1166       bstrncat(sequence, "_seq", sizeof(sequence));
1167       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
1168    } else {
1169       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
1170    }
1171    
1172    return id;
1173 }
1174
1175 #ifdef HAVE_BATCH_FILE_INSERT
1176 const char *my_dbi_batch_lock_path_query[3] = {
1177    /* Mysql */
1178    "LOCK TABLES Path write, batch write, Path as p write",
1179    /* Postgresql */
1180    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE",
1181    /* SQLite */
1182    "BEGIN"};  
1183
1184 const char *my_dbi_batch_lock_filename_query[3] = {
1185    /* Mysql */
1186    "LOCK TABLES Filename write, batch write, Filename as f write",
1187    /* Postgresql */
1188    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE",
1189    /* SQLite */
1190    "BEGIN"};
1191
1192 const char *my_dbi_batch_unlock_tables_query[3] = {
1193    /* Mysql */
1194    "UNLOCK TABLES",
1195    /* Postgresql */
1196    "COMMIT",
1197    /* SQLite */
1198    "COMMIT"};
1199
1200 const char *my_dbi_batch_fill_path_query[3] = {
1201    /* Mysql */
1202    "INSERT INTO Path (Path) "
1203    "SELECT a.Path FROM " 
1204    "(SELECT DISTINCT Path FROM batch) AS a WHERE NOT EXISTS "
1205    "(SELECT Path FROM Path AS p WHERE p.Path = a.Path)",
1206    /* Postgresql */
1207    "INSERT INTO Path (Path) "
1208    "SELECT a.Path FROM "
1209    "(SELECT DISTINCT Path FROM batch) AS a "
1210    "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ",
1211    /* SQLite */
1212    "INSERT INTO Path (Path)" 
1213    " SELECT DISTINCT Path FROM batch"
1214    " EXCEPT SELECT Path FROM Path"};
1215
1216 const char *my_dbi_batch_fill_filename_query[3] = {
1217    /* Mysql */
1218    "INSERT INTO Filename (Name) "
1219    "SELECT a.Name FROM " 
1220    "(SELECT DISTINCT Name FROM batch) AS a WHERE NOT EXISTS "
1221    "(SELECT Name FROM Filename AS f WHERE f.Name = a.Name)",
1222    /* Postgresql */
1223    "INSERT INTO Filename (Name) "
1224    "SELECT a.Name FROM "
1225    "(SELECT DISTINCT Name FROM batch) as a "
1226    "WHERE NOT EXISTS "
1227    "(SELECT Name FROM Filename WHERE Name = a.Name)",
1228    /* SQLite */
1229    "INSERT INTO Filename (Name)"
1230    " SELECT DISTINCT Name FROM batch "
1231    " EXCEPT SELECT Name FROM Filename"};
1232 #endif /* HAVE_BATCH_FILE_INSERT */
1233
1234 #endif /* HAVE_DBI */