]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/dbi.c
Apply patch (with some difficulties) from Joao Henrique Freitas
[bacula/bacula] / bacula / src / cats / dbi.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to DBI
30  *   These are DBI specific routines
31  *
32  *    João Henrique Freitas, December 2007
33  *    based upon work done by Dan Langille, December 2003 and
34  *    by Kern Sibbald, March 2000
35  *
36  *    Version $Id$
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_DBI
49
50 /* -----------------------------------------------------------------------
51  *
52  *   DBI dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /* List of open databases */
58 static BQUEUE db_list = {&db_list, &db_list};
59
60 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
61
62 /*
63  * Retrieve database type
64  */
65 const char *
66 db_get_type(void)
67 {
68    return "DBI";
69 }
70
71 /*
72  * Initialize database data structure. In principal this should
73  * never have errors, or it is really fatal.
74  */
75 B_DB *
76 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
77                  const char *db_address, int db_port, const char *db_socket, 
78                  int mult_db_connections)
79 {
80    B_DB *mdb;
81    char db_driver[10];
82    char db_driverdir[256];
83
84    /* Constraint the db_driver */
85    if(db_type  == -1) {
86       Jmsg(jcr, M_FATAL, 0, _("A dbi driver for DBI must be supplied.\n"));
87       return NULL;
88    }
89    
90    /* Do the correct selection of driver. 
91     * Can be one of the varius supported by libdbi 
92     */
93    switch (db_type) {
94    case SQL_TYPE_MYSQL:
95       bstrncpy(db_driver,"mysql", sizeof(db_driver));
96       break;
97    case SQL_TYPE_POSTGRESQL:
98       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
99       break;
100    case SQL_TYPE_SQLITE:
101       bstrncpy(db_driver,"pgsql", sizeof(db_driver));
102       break;
103    }
104    
105    /* Set db_driverdir whereis is the libdbi drivers */
106    bstrncpy(db_driverdir, DBI_DRIVER_DIR, 255);
107    
108    if (!db_user) {
109       Jmsg(jcr, M_FATAL, 0, _("A user name for DBI must be supplied.\n"));
110       return NULL;
111    }
112    P(mutex);                          /* lock DB queue */
113    if (!mult_db_connections) {
114       /* Look to see if DB already open */
115       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
116          if (bstrcmp(mdb->db_name, db_name) &&
117              bstrcmp(mdb->db_address, db_address) &&
118              bstrcmp(mdb->db_driver, db_driver) &&
119              mdb->db_port == db_port) { 
120             Dmsg3(100, "DB REopen %d %s %s\n", mdb->ref_count, db_driver, db_name);
121             mdb->ref_count++;
122             V(mutex);
123             return mdb;                  /* already open */
124          }
125       }
126    }
127    Dmsg0(100, "db_open first time\n");
128    mdb = (B_DB *)malloc(sizeof(B_DB));
129    memset(mdb, 0, sizeof(B_DB));
130    mdb->db_name = bstrdup(db_name);
131    mdb->db_user = bstrdup(db_user);
132    if (db_password) {
133       mdb->db_password = bstrdup(db_password);
134    }
135    if (db_address) {
136       mdb->db_address  = bstrdup(db_address);
137    }
138    if (db_socket) {
139       mdb->db_socket   = bstrdup(db_socket);
140    }
141    if (db_driverdir) {
142       mdb->db_driverdir = bstrdup(db_driverdir);
143    }
144    if (db_driver) {
145       mdb->db_driver    = bstrdup(db_driver);
146    }
147    mdb->db_type        = db_type;
148    mdb->db_port        = db_port;
149    mdb->have_insert_id = TRUE;
150    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
151    *mdb->errmsg        = 0;
152    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
153    mdb->cached_path    = get_pool_memory(PM_FNAME);
154    mdb->cached_path_id = 0;
155    mdb->ref_count      = 1;
156    mdb->fname          = get_pool_memory(PM_FNAME);
157    mdb->path           = get_pool_memory(PM_FNAME);
158    mdb->esc_name       = get_pool_memory(PM_FNAME);
159    mdb->esc_path      = get_pool_memory(PM_FNAME);
160    mdb->allow_transactions = mult_db_connections;
161    qinsert(&db_list, &mdb->bq);            /* put db in list */
162    V(mutex);
163    return mdb;
164 }
165
166 /*
167  * Now actually open the database.  This can generate errors,
168  *   which are returned in the errmsg
169  *
170  * DO NOT close the database or free(mdb) here  !!!!
171  */
172 int
173 db_open_database(JCR *jcr, B_DB *mdb)
174 {   
175    int errstat;
176    int dbstat;
177    const char *errmsg;
178    char buf[10], *port;
179    int numdrivers; 
180    
181    P(mutex);
182    if (mdb->connected) {
183       V(mutex);
184       return 1;
185    }
186    mdb->connected = false;
187
188    if ((errstat=rwl_init(&mdb->lock)) != 0) {
189       berrno be;
190       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
191             be.bstrerror(errstat));
192       V(mutex);
193       return 0;
194    }
195
196    if (mdb->db_port) {
197       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
198       port = buf;
199    } else {
200       port = NULL;
201    }
202    
203    numdrivers = dbi_initialize(mdb->db_driverdir);
204    if (numdrivers < 0) {
205       dbi_shutdown();
206       Mmsg2(&mdb->errmsg, _("Unable to locate the DBD drivers to DBI interface in: \n"
207                                "db_driverdir=%s. It is probaly not found any drivers\n"),
208                                mdb->db_driverdir,numdrivers);
209       V(mutex);
210       return 0;
211    }
212    mdb->db = (void **)dbi_conn_new(mdb->db_driver);
213    dbi_conn_set_option(mdb->db, "host", mdb->db_address); /* default = localhost */
214    dbi_conn_set_option(mdb->db, "port", port);            /* default port */
215    dbi_conn_set_option(mdb->db, "username", mdb->db_user);     /* login name */
216    dbi_conn_set_option(mdb->db, "password", mdb->db_password); /* password */
217    dbi_conn_set_option(mdb->db, "dbname", mdb->db_name);       /* database name */
218       
219    /* If connection fails, try at 5 sec intervals for 30 seconds. */
220    for (int retry=0; retry < 6; retry++) {
221          
222       dbstat = dbi_conn_connect(mdb->db); 
223       if ( dbstat == 0) {
224          break;
225       }
226          
227       dbi_conn_error(mdb->db, &errmsg);
228       Dmsg1(50, "dbi error: %s\n", errmsg);
229                    
230       bmicrosleep(5, 0);
231          
232    }
233    
234    if ( dbstat != 0 ) {
235       Mmsg3(&mdb->errmsg, _("Unable to connect to DBI interface.\n"
236                        "Type=%s Database=%s User=%s\n"
237                        "It is probably not running or your password is incorrect.\n"),
238                         mdb->db_driver, mdb->db_name, mdb->db_user);
239       V(mutex);
240       return 0;           
241    } 
242    
243    Dmsg0(50, "dbi_real_connect done\n");
244    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n",
245                     mdb->db_user, mdb->db_name,
246                     mdb->db_password==NULL?"(NULL)":mdb->db_password);
247
248    mdb->connected = true;
249
250    if (!check_tables_version(jcr, mdb)) {
251       V(mutex);
252       return 0;
253    }
254    
255    switch (mdb->db_type) {
256    case SQL_TYPE_MYSQL:
257       /* Set connection timeout to 8 days specialy for batch mode */
258       sql_query(mdb, "SET wait_timeout=691200");
259       sql_query(mdb, "SET interactive_timeout=691200");
260       break;
261    case SQL_TYPE_POSTGRESQL:
262       /* tell PostgreSQL we are using standard conforming strings
263          and avoid warnings such as:
264          WARNING:  nonstandard use of \\ in a string literal
265       */
266       sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
267       sql_query(mdb, "set standard_conforming_strings=on");
268       break;
269    case SQL_TYPE_SQLITE:
270       break;
271    }
272    
273    V(mutex);
274    return 1;
275 }
276
277 void
278 db_close_database(JCR *jcr, B_DB *mdb)
279 {
280    if (!mdb) {
281       return;
282    }
283    db_end_transaction(jcr, mdb);
284    P(mutex);
285    sql_free_result(mdb);
286    mdb->ref_count--;
287    if (mdb->ref_count == 0) {
288       qdchain(&mdb->bq);
289       if (mdb->connected && mdb->db) {
290          sql_close(mdb);
291          mdb->db = NULL;
292       }
293       rwl_destroy(&mdb->lock);
294       free_pool_memory(mdb->errmsg);
295       free_pool_memory(mdb->cmd);
296       free_pool_memory(mdb->cached_path);
297       free_pool_memory(mdb->fname);
298       free_pool_memory(mdb->path);
299       free_pool_memory(mdb->esc_name);
300       free_pool_memory(mdb->esc_path);
301       if (mdb->db_name) {
302          free(mdb->db_name);
303       }
304       if (mdb->db_user) {
305          free(mdb->db_user);
306       }
307       if (mdb->db_password) {
308          free(mdb->db_password);
309       }
310       if (mdb->db_address) {
311          free(mdb->db_address);
312       }
313       if (mdb->db_socket) {
314          free(mdb->db_socket);
315       }
316       dbi_shutdown();
317       if (mdb->db_driver) {
318           free(mdb->db_driver);
319       }
320       free(mdb);
321       
322       
323    }   
324    V(mutex);
325 }
326
327 void db_thread_cleanup()
328 { }
329
330 /*
331  * Return the next unique index (auto-increment) for
332  * the given table.  Return NULL on error.
333  *
334  */
335 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
336 {
337    strcpy(index, "NULL");
338    return 1;
339 }
340
341
342 /*
343  * Escape strings so that DBI is happy
344  *
345  *   NOTE! len is the length of the old string. Your new
346  *         string must be long enough (max 2*old+1) to hold
347  *         the escaped output.
348  * 
349  * dbi_conn_quote_string_copy receives a pointer to pointer.
350  * We need copy the value of pointer to snew. Because libdbi change the 
351  * pointer
352  */
353 void
354 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
355 {
356    char *inew;
357    char *pnew;
358    
359    if (len == 0) {
360           snew[0] = 0; 
361    } else {
362           /* correct the size of old basead in len and copy new string to inew */
363           inew = (char *)malloc(sizeof(char) * len + 1);
364           bstrncpy(inew,old,len + 1);
365           /* escape the correct size of old */
366           dbi_conn_escape_string_copy(mdb->db, inew, &pnew);
367           /* copy the escaped string to snew */
368       bstrncpy(snew, pnew, 2 * len + 1);   
369    }
370    
371    Dmsg2(500, "dbi_conn_escape_string_copy %p %s\n",snew,snew);
372    
373 }
374
375 /*
376  * Submit a general SQL command (cmd), and for each row returned,
377  *  the sqlite_handler is called with the ctx.
378  */
379 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
380 {
381    SQL_ROW row;
382
383    Dmsg0(500, "db_sql_query started\n");
384
385    db_lock(mdb);
386    if (sql_query(mdb, query) != 0) {
387       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
388       db_unlock(mdb);
389       Dmsg0(500, "db_sql_query failed\n");
390       return false;
391    }
392    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
393
394    if (result_handler != NULL) {
395       Dmsg0(500, "db_sql_query invoking handler\n");
396       if ((mdb->result = sql_store_result(mdb)) != NULL) {
397          int num_fields = sql_num_fields(mdb);
398
399          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
400          while ((row = sql_fetch_row(mdb)) != NULL) {
401
402             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
403             if (result_handler(ctx, num_fields, row))
404                break;
405          }
406
407         sql_free_result(mdb);
408       }
409    }
410    db_unlock(mdb);
411
412    Dmsg0(500, "db_sql_query finished\n");
413
414    return true;
415 }
416
417
418
419 DBI_ROW my_dbi_fetch_row(B_DB *mdb)
420 {
421    int j;
422    DBI_ROW row = NULL; // by default, return NULL
423
424    Dmsg0(500, "my_dbi_fetch_row start\n");
425
426    if (!mdb->row || mdb->row_size < mdb->num_fields) {
427       int num_fields = mdb->num_fields;
428       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
429
430       if (mdb->row) {
431          Dmsg0(500, "my_dbi_fetch_row freeing space\n");
432          free(mdb->row);
433       }
434       num_fields += 20;                  /* add a bit extra */
435       mdb->row = (DBI_ROW)malloc(sizeof(char *) * num_fields);
436       mdb->row_size = num_fields;
437
438       // now reset the row_number now that we have the space allocated
439       mdb->row_number = 1;
440    }
441
442    // if still within the result set
443    if (mdb->row_number <= mdb->num_rows) {
444       Dmsg2(500, "my_dbi_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
445       // get each value from this row
446       for (j = 0; j < mdb->num_fields; j++) {
447          mdb->row[j] = my_dbi_getvalue(mdb->result, mdb->row_number, j);
448          Dmsg2(500, "my_dbi_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
449       }
450       // increment the row number for the next call
451       mdb->row_number++;
452
453       row = mdb->row;
454    } else {
455       Dmsg2(500, "my_dbi_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
456    }
457
458    Dmsg1(500, "my_dbi_fetch_row finishes returning %p\n", row);
459
460    return row;
461 }
462
463 int my_dbi_max_length(B_DB *mdb, int field_num) {
464    //
465    // for a given column, find the max length
466    //
467    int max_length;
468    int i;
469    int this_length;
470
471    max_length = 0;
472    for (i = 0; i < mdb->num_rows; i++) {
473       if (my_dbi_getisnull(mdb->result, i, field_num)) {
474           this_length = 4;        // "NULL"
475       } else {
476           // TODO: error
477           this_length = cstrlen(my_dbi_getvalue(mdb->result, i, field_num));
478       }
479
480       if (max_length < this_length) {
481           max_length = this_length;
482       }
483    }
484
485    return max_length;
486 }
487
488 DBI_FIELD * my_dbi_fetch_field(B_DB *mdb)
489 {
490    int     i;
491    int     dbi_index;
492
493    Dmsg0(500, "my_dbi_fetch_field starts\n");
494
495    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
496       if (mdb->fields) {
497          free(mdb->fields);
498       }
499       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
500       mdb->fields = (DBI_FIELD *)malloc(sizeof(DBI_FIELD) * mdb->num_fields);
501       mdb->fields_size = mdb->num_fields;
502
503       for (i = 0; i < mdb->num_fields; i++) {
504          dbi_index = i + 1;
505          Dmsg1(500, "filling field %d\n", i);
506          mdb->fields[i].name       = (char *)dbi_result_get_field_name(mdb->result, dbi_index);
507          mdb->fields[i].max_length = my_dbi_max_length(mdb, i);
508          mdb->fields[i].type       = dbi_result_get_field_type_idx(mdb->result, dbi_index);
509          mdb->fields[i].flags      = dbi_result_get_field_attribs_idx(mdb->result, dbi_index);
510
511          Dmsg4(500, "my_dbi_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
512             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
513             mdb->fields[i].flags);
514       } // end for
515    } // end if
516
517    // increment field number for the next time around
518
519    Dmsg0(500, "my_dbi_fetch_field finishes\n");
520    return &mdb->fields[mdb->field_number++];
521 }
522
523 void my_dbi_data_seek(B_DB *mdb, int row)
524 {
525    // set the row number to be returned on the next call
526    // to my_dbi_fetch_row
527    mdb->row_number = row;
528 }
529
530 void my_dbi_field_seek(B_DB *mdb, int field)
531 {
532    mdb->field_number = field;
533 }
534
535 /*
536  * Note, if this routine returns 1 (failure), Bacula expects
537  *  that no result has been stored.
538  *
539  *  Returns:  0  on success
540  *            1  on failure
541  *
542  */
543 int my_dbi_query(B_DB *mdb, const char *query)
544 {
545    const char *errmsg;
546    Dmsg1(500, "my_dbi_query started %s\n", query);
547    // We are starting a new query.  reset everything.
548    mdb->num_rows     = -1;
549    mdb->row_number   = -1;
550    mdb->field_number = -1;
551
552    if (mdb->result) {
553       dbi_result_free(mdb->result);  /* hmm, someone forgot to free?? */
554       mdb->result = NULL;
555    }
556
557    //for (int i=0; i < 10; i++) {
558           
559       mdb->result = (void **)dbi_conn_query(mdb->db, query);
560       
561    //   if (mdb->result) {
562    //      break;
563    //   }
564    //   bmicrosleep(5, 0);
565    //}
566    if (mdb->result == NULL) {
567       Dmsg2(50, "Query failed: %s %p\n", query, mdb->result);      
568       goto bail_out;
569    }
570
571    //mdb->status = (dbi_error_flag)dbi_conn_error_flag(mdb->db);
572    mdb->status = DBI_ERROR_NONE;
573    
574    if (mdb->status == DBI_ERROR_NONE) {
575       Dmsg1(500, "we have a result\n", query);
576
577       // how many fields in the set?
578       mdb->num_fields = dbi_result_get_numfields(mdb->result);
579       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
580
581       mdb->num_rows = dbi_result_get_numrows(mdb->result);
582       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
583
584       mdb->status = (dbi_error_flag) 0;                  /* succeed */
585    } else {
586       Dmsg1(50, "Result status failed: %s\n", query);
587       goto bail_out;
588    }
589
590    Dmsg0(500, "my_dbi_query finishing\n");
591    return mdb->status;
592
593 bail_out:
594    mdb->status = dbi_conn_error_flag(mdb->db);
595    dbi_conn_error(mdb->db, &errmsg);
596    Dmsg4(500, "my_dbi_query we failed dbi error "
597                    "'%s' '%p' '%d' flag '%d''\n", errmsg, mdb->result, mdb->result, mdb->status);
598    dbi_result_free(mdb->result);
599    mdb->result = NULL;
600    mdb->status = (dbi_error_flag) 1;                   /* failed */
601    return mdb->status;
602 }
603
604 void my_dbi_free_result(B_DB *mdb)
605 {
606    int i;
607    
608    db_lock(mdb);
609    //Dmsg2(500, "my_dbi_free_result started result '%p' '%p'\n", mdb->result, mdb->result);
610    if (mdb->result != NULL) {
611           i = dbi_result_free(mdb->result);
612       if(i == 0) {
613          mdb->result = NULL;
614          //Dmsg2(500, "my_dbi_free_result result '%p' '%d'\n", mdb->result, mdb->result);
615       }
616       
617    }
618
619    if (mdb->row) {
620       free(mdb->row);
621       mdb->row = NULL;
622    }
623
624    if (mdb->fields) {
625       free(mdb->fields);
626       mdb->fields = NULL;
627    }
628    db_unlock(mdb);
629    //Dmsg0(500, "my_dbi_free_result finish\n");
630    
631 }
632
633 const char *my_dbi_strerror(B_DB *mdb) 
634 {
635    const char *errmsg;
636    
637    dbi_conn_error(mdb->db, &errmsg);
638         
639    return errmsg;
640 }
641
642 // TODO: make batch insert work with libdbi
643 #ifdef HAVE_BATCH_FILE_INSERT
644
645 int my_dbi_batch_start(JCR *jcr, B_DB *mdb)
646 {
647    char *query = "COPY batch FROM STDIN";
648
649    Dmsg0(500, "my_postgresql_batch_start started\n");
650
651    if (my_postgresql_query(mdb,
652                            "CREATE TEMPORARY TABLE batch ("
653                                "fileindex int,"
654                                "jobid int,"
655                                "path varchar,"
656                                "name varchar,"
657                                "lstat varchar,"
658                                "md5 varchar)") == 1)
659    {
660       Dmsg0(500, "my_postgresql_batch_start failed\n");
661       return 1;
662    }
663    
664    // We are starting a new query.  reset everything.
665    mdb->num_rows     = -1;
666    mdb->row_number   = -1;
667    mdb->field_number = -1;
668
669    my_postgresql_free_result(mdb);
670
671    for (int i=0; i < 10; i++) {
672       mdb->result = PQexec(mdb->db, query);
673       if (mdb->result) {
674          break;
675       }
676       bmicrosleep(5, 0);
677    }
678    if (!mdb->result) {
679       Dmsg1(50, "Query failed: %s\n", query);
680       goto bail_out;
681    }
682
683    mdb->status = PQresultStatus(mdb->result);
684    if (mdb->status == PGRES_COPY_IN) {
685       // how many fields in the set?
686       mdb->num_fields = (int) PQnfields(mdb->result);
687       mdb->num_rows   = 0;
688       mdb->status = 1;
689    } else {
690       Dmsg1(50, "Result status failed: %s\n", query);
691       goto bail_out;
692    }
693
694    Dmsg0(500, "my_postgresql_batch_start finishing\n");
695
696    return mdb->status;
697
698 bail_out:
699    Mmsg1(&mdb->errmsg, _("error starting batch mode: %s"), PQerrorMessage(mdb->db));
700    mdb->status = 0;
701    PQclear(mdb->result);
702    mdb->result = NULL;
703    return mdb->status;
704 }
705
706 /* set error to something to abort operation */
707 int my_dbi_batch_end(JCR *jcr, B_DB *mdb, const char *error)
708 {
709    int res;
710    int count=30;
711    Dmsg0(500, "my_postgresql_batch_end started\n");
712
713    if (!mdb) {                  /* no files ? */
714       return 0;
715    }
716
717    do { 
718       res = PQputCopyEnd(mdb->db, error);
719    } while (res == 0 && --count > 0);
720
721    if (res == 1) {
722       Dmsg0(500, "ok\n");
723       mdb->status = 1;
724    }
725    
726    if (res <= 0) {
727       Dmsg0(500, "we failed\n");
728       mdb->status = 0;
729       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
730    }
731    
732    Dmsg0(500, "my_postgresql_batch_end finishing\n");
733
734    return mdb->status;
735 }
736
737 int my_dbi_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
738 {
739    int res;
740    int count=30;
741    size_t len;
742    char *digest;
743    char ed1[50];
744
745    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
746    my_postgresql_copy_escape(mdb->esc_name, mdb->fname, mdb->fnl);
747
748    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
749    my_postgresql_copy_escape(mdb->esc_path, mdb->path, mdb->pnl);
750
751    if (ar->Digest == NULL || ar->Digest[0] == 0) {
752       digest = "0";
753    } else {
754       digest = ar->Digest;
755    }
756
757    len = Mmsg(mdb->cmd, "%u\t%s\t%s\t%s\t%s\t%s\n", 
758               ar->FileIndex, edit_int64(ar->JobId, ed1), mdb->esc_path, 
759               mdb->esc_name, ar->attr, digest);
760
761    do { 
762       res = PQputCopyData(mdb->db,
763                           mdb->cmd,
764                           len);
765    } while (res == 0 && --count > 0);
766
767    if (res == 1) {
768       Dmsg0(500, "ok\n");
769       mdb->changes++;
770       mdb->status = 1;
771    }
772
773    if (res <= 0) {
774       Dmsg0(500, "we failed\n");
775       mdb->status = 0;
776       Mmsg1(&mdb->errmsg, _("error ending batch mode: %s"), PQerrorMessage(mdb->db));
777    }
778
779    Dmsg0(500, "my_postgresql_batch_insert finishing\n");
780
781    return mdb->status;
782 }
783
784 #endif /* HAVE_BATCH_FILE_INSERT */
785
786 /* my_dbi_getisnull
787  * like PQgetisnull
788  * int PQgetisnull(const PGresult *res,
789  *              int row_number,
790  *               int column_number);
791  * 
792  *  use dbi_result_seek_row to search in result set
793  */
794 int my_dbi_getisnull(dbi_result *result, int row_number, int column_number) {
795    int i;
796
797    if(row_number == 0) {
798       row_number++;
799    }
800    
801    column_number++;
802    
803    if(dbi_result_seek_row(result, row_number)) {
804
805       i = dbi_result_field_is_null_idx(result,column_number);
806
807       return i;
808    } else {
809            
810       return 0;
811    }
812                 
813 }
814 /* my_dbi_getvalue 
815  * like PQgetvalue;
816  * char *PQgetvalue(const PGresult *res,
817  *                int row_number,
818  *                int column_number);
819  *
820  * use dbi_result_seek_row to search in result set
821  * use example to return only strings
822  */
823 char *my_dbi_getvalue(dbi_result *result, int row_number, unsigned int column_number) {
824
825    /* TODO: This is very bad, need refactoring */
826    POOLMEM *buf = get_pool_memory(PM_FNAME);
827    //const unsigned char *bufb = (unsigned char *)malloc(sizeof(unsigned char) * 300);
828    //const unsigned char *bufb;
829    const char *errmsg;
830    const char *field_name;     
831    unsigned short dbitype;
832    int32_t field_length = 0;
833    int64_t num;
834         
835    /* correct the index for dbi interface
836     * dbi index begins 1
837     * I prefer do not change others functions
838     */
839    Dmsg3(600, "my_dbi_getvalue pre-starting result '%p' row number '%d' column number '%d'\n", 
840                                 result, row_number, column_number);
841         
842    column_number++;
843
844    if(row_number == 0) {
845      row_number++;
846    }
847       
848    Dmsg3(600, "my_dbi_getvalue starting result '%p' row number '%d' column number '%d'\n", 
849                         result, row_number, column_number);
850    
851    if(dbi_result_seek_row(result, row_number)) {
852
853       field_name = dbi_result_get_field_name(result, column_number);
854       field_length = dbi_result_get_field_length(result, field_name);
855       dbitype = dbi_result_get_field_type_idx(result,column_number);
856                 
857       if(field_length) {
858           buf = check_pool_memory_size(buf, field_length + 1);
859       } else {
860           buf = check_pool_memory_size(buf, 50);
861       }
862       
863       Dmsg5(500, "my_dbi_getvalue result '%p' type '%d' \n field name '%s' "
864             "field_length '%d' field_length size '%d'\n", 
865             result, dbitype, field_name, field_length, sizeof_pool_memory(buf));
866       
867       switch (dbitype) {
868       case DBI_TYPE_INTEGER:
869          num = dbi_result_get_longlong(result, field_name);         
870          edit_int64(num, buf);
871          field_length = strlen(buf);
872          break;
873       case DBI_TYPE_STRING:
874          if(field_length) {
875             field_length = bsnprintf(buf, field_length + 1, "%s", 
876                   dbi_result_get_string(result, field_name));
877          } else {
878             buf[0] = 0;
879          }
880          break;
881       case DBI_TYPE_BINARY:
882          /* dbi_result_get_binary return a NULL pointer if value is empty
883          * following, change this to what Bacula espected
884          */
885          if(field_length) {
886             field_length = bsnprintf(buf, field_length + 1, "%s", 
887                   dbi_result_get_binary(result, field_name));
888          } else {
889             buf[0] = 0;
890          }
891          break;
892       case DBI_TYPE_DATETIME:
893          time_t last;
894          struct tm tm;
895          
896          last = dbi_result_get_datetime(result, field_name);
897          
898          if(last == -1) {
899                 field_length = bsnprintf(buf, 20, "0000-00-00 00:00:00"); 
900          } else {
901             (void)localtime_r(&last, &tm);
902             field_length = bsnprintf(buf, 20, "%04d-%02d-%02d %02d:%02d:%02d",
903                   (tm.tm_year + 1900), (tm.tm_mon + 1), tm.tm_mday,
904                   tm.tm_hour, tm.tm_min, tm.tm_sec);
905          }
906          break;
907       }
908
909    } else {
910       dbi_conn_error(dbi_result_get_conn(result), &errmsg);
911       Dmsg1(500, "my_dbi_getvalue error: %s\n", errmsg);
912    }
913                 
914    Dmsg3(500, "my_dbi_getvalue finish result '%p' num bytes '%d' data '%s'\n", 
915       result, field_length, buf);
916    return buf;
917 }
918
919 int my_dbi_sql_insert_id(B_DB *mdb, char *table_name)
920 {
921    /*
922     Obtain the current value of the sequence that
923     provides the serial value for primary key of the table.
924
925     currval is local to our session.  It is not affected by
926     other transactions.
927
928     Determine the name of the sequence.
929     PostgreSQL automatically creates a sequence using
930     <table>_<column>_seq.
931     At the time of writing, all tables used this format for
932     for their primary key: <table>id
933     Except for basefiles which has a primary key on baseid.
934     Therefore, we need to special case that one table.
935
936     everything else can use the PostgreSQL formula.
937    */
938    
939    char      sequence[30];  
940    uint64_t    id = 0;
941
942    if (mdb->db_type == SQL_TYPE_POSTGRESQL) {
943            
944       if (strcasecmp(table_name, "basefiles") == 0) {
945          bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
946       } else {
947          bstrncpy(sequence, table_name, sizeof(sequence));
948          bstrncat(sequence, "_",        sizeof(sequence));
949          bstrncat(sequence, table_name, sizeof(sequence));
950          bstrncat(sequence, "id",       sizeof(sequence));
951       }
952
953       bstrncat(sequence, "_seq", sizeof(sequence));
954       id = dbi_conn_sequence_last(mdb->db, NT_(sequence));
955    } else {
956       id = dbi_conn_sequence_last(mdb->db, NT_(table_name));
957    }
958    
959    return id;
960 }
961
962 #ifdef HAVE_BATCH_FILE_INSERT
963 const char *my_dbi_batch_lock_path_query = 
964    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
965
966
967 const char *my_dbi_batch_lock_filename_query = 
968    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
969
970 const char *my_dbi_batch_unlock_tables_query = "COMMIT";
971
972 const char *my_dbi_batch_fill_path_query = 
973    "INSERT INTO Path (Path) "
974     "SELECT a.Path FROM "
975      "(SELECT DISTINCT Path FROM batch) AS a "
976       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
977
978
979 const char *my_dbi_batch_fill_filename_query = 
980    "INSERT INTO Filename (Name) "
981     "SELECT a.Name FROM "
982      "(SELECT DISTINCT Name FROM batch) as a "
983       "WHERE NOT EXISTS "
984        "(SELECT Name FROM Filename WHERE Name = a.Name)";
985 #endif /* HAVE_BATCH_FILE_INSERT */
986
987 #endif /* HAVE_DBI */