]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Revert "Added limit_filter which rewrites queries which use the LIMIT functions into"
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  *   These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_INGRES
49
50 #include "myingres.h"
51
52 /* -----------------------------------------------------------------------
53  *
54  *   Ingres dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */  /* SRE: needed for ingres? */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "Ingres";
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83
84    if (!db_user) {
85       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
86       return NULL;
87    }
88    P(mutex);                          /* lock DB queue */
89    if (!mult_db_connections) {
90       /* Look to see if DB already open */
91       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
92          if (bstrcmp(mdb->db_name, db_name) &&
93              bstrcmp(mdb->db_address, db_address) &&
94              mdb->db_port == db_port) {
95             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
96             mdb->ref_count++;
97             V(mutex);
98             return mdb;                  /* already open */
99          }
100       }
101    }
102    Dmsg0(100, "db_open first time\n");
103    mdb = (B_DB *)malloc(sizeof(B_DB));
104    memset(mdb, 0, sizeof(B_DB));
105    mdb->db_name = bstrdup(db_name);
106    mdb->db_user = bstrdup(db_user);
107    if (db_password) {
108       mdb->db_password = bstrdup(db_password);
109    }
110    if (db_address) {
111       mdb->db_address  = bstrdup(db_address);
112    }
113    if (db_socket) {
114       mdb->db_socket   = bstrdup(db_socket);
115    }
116    mdb->db_port        = db_port;
117    mdb->have_insert_id = TRUE;
118    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
119    *mdb->errmsg        = 0;
120    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
121    mdb->cached_path    = get_pool_memory(PM_FNAME);
122    mdb->cached_path_id = 0;
123    mdb->ref_count      = 1;
124    mdb->fname          = get_pool_memory(PM_FNAME);
125    mdb->path           = get_pool_memory(PM_FNAME);
126    mdb->esc_name       = get_pool_memory(PM_FNAME);
127    mdb->esc_path      = get_pool_memory(PM_FNAME);
128    mdb->allow_transactions = mult_db_connections;
129    qinsert(&db_list, &mdb->bq);            /* put db in list */
130    V(mutex);
131    return mdb;
132 }
133
134 /* Check that the database correspond to the encoding we want */
135 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
136 {
137 /* SRE: TODO! Needed?
138  SQL_ROW row;
139    int ret=false;
140
141    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
142       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
143       return false;
144    }
145
146    if ((row = sql_fetch_row(mdb)) == NULL) {
147       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
148       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
149    } else {
150       ret = bstrcmp(row[0], "SQL_ASCII");
151       if (!ret) {
152          Mmsg(mdb->errmsg, 
153               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
154               mdb->db_name, row[0]);
155          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
156          Dmsg1(50, "%s", mdb->errmsg);
157       } 
158    }
159    return ret;
160 */
161     return true;
162 }
163
164 /*
165  * Check for errors in DBMS work
166  */
167 static int sql_check(B_DB *mdb)
168 {
169     int errorcode;
170
171     if ((errorcode = INGcheck()) < 0) {
172         /* TODO: fill mdb->errmsg */
173         Mmsg(mdb->errmsg, "Something went wrong - still searching!\n");
174     } else if (errorcode > 0) {
175         /* just a warning, proceed */
176     }
177     return errorcode;
178 }
179
180 /*
181  * Now actually open the database.  This can generate errors,
182  *   which are returned in the errmsg
183  *
184  * DO NOT close the database or free(mdb) here !!!!
185  */
186 int
187 db_open_database(JCR *jcr, B_DB *mdb)
188 {
189    int errstat;
190    char buf[10], *port;
191
192    P(mutex);
193    if (mdb->connected) {
194       V(mutex);
195       return 1;
196    }
197    mdb->connected = false;
198
199    if ((errstat=rwl_init(&mdb->lock)) != 0) {
200       berrno be;
201       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
202             be.bstrerror(errstat));
203       V(mutex);
204       return 0;
205    }
206
207    if (mdb->db_port) {
208       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
209       port = buf;
210    } else {
211       port = NULL;
212    }
213
214    mdb->db = INGconnectDB(mdb->db_name, mdb->db_user, mdb->db_password);
215
216    Dmsg0(50, "Ingres real CONNECT done\n");
217    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
218             mdb->db_password==NULL?"(NULL)":mdb->db_password);
219
220    if (sql_check(mdb)) {
221       Mmsg2(&mdb->errmsg, _("Unable to connect to Ingres server.\n"
222             "Database=%s User=%s\n"
223             "It is probably not running or your password is incorrect.\n"),
224              mdb->db_name, mdb->db_user);
225       V(mutex);
226       return 0;
227    }
228
229    mdb->connected = true;
230
231    if (!check_tables_version(jcr, mdb)) {
232       V(mutex);
233       return 0;
234    }
235
236    //sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
237    
238    /* check that encoding is SQL_ASCII */
239    check_database_encoding(jcr, mdb);
240
241    V(mutex);
242    return 1;
243 }
244
245 void
246 db_close_database(JCR *jcr, B_DB *mdb)
247 {
248    if (!mdb) {
249       return;
250    }
251    db_end_transaction(jcr, mdb);
252    P(mutex);
253    sql_free_result(mdb);
254    mdb->ref_count--;
255    if (mdb->ref_count == 0) {
256       qdchain(&mdb->bq);
257       if (mdb->connected && mdb->db) {
258          sql_close(mdb);
259       }
260       rwl_destroy(&mdb->lock);
261       free_pool_memory(mdb->errmsg);
262       free_pool_memory(mdb->cmd);
263       free_pool_memory(mdb->cached_path);
264       free_pool_memory(mdb->fname);
265       free_pool_memory(mdb->path);
266       free_pool_memory(mdb->esc_name);
267       free_pool_memory(mdb->esc_path);
268       if (mdb->db_name) {
269          free(mdb->db_name);
270       }
271       if (mdb->db_user) {
272          free(mdb->db_user);
273       }
274       if (mdb->db_password) {
275          free(mdb->db_password);
276       }
277       if (mdb->db_address) {
278          free(mdb->db_address);
279       }
280       if (mdb->db_socket) {
281          free(mdb->db_socket);
282       }
283       free(mdb);
284    }
285    V(mutex);
286 }
287
288 void db_check_backend_thread_safe()
289 { }
290
291
292
293 void db_thread_cleanup()
294 { }
295
296 /*
297  * Return the next unique index (auto-increment) for
298  * the given table.  Return NULL on error.
299  *
300  * For Ingres, NULL causes the auto-increment value     SRE: true?
301  *  to be updated.
302  */
303 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
304 {
305    strcpy(index, "NULL");
306    return 1;
307 }
308
309
310 /*
311  * Escape strings so that Ingres is happy
312  *
313  *   NOTE! len is the length of the old string. Your new
314  *         string must be long enough (max 2*old+1) to hold
315  *         the escaped output.
316  */
317 void
318 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
319 {
320    char *n, *o;
321
322    n = snew;
323    o = old;
324    while (len--) {
325       switch (*o) {
326       case '\'':
327          *n++ = '\'';
328          *n++ = '\'';
329          o++;
330          break;
331       case 0:
332          *n++ = '\\';
333          *n++ = 0;
334          o++;
335          break;
336       default:
337          *n++ = *o++;
338          break;
339       }
340    }
341    *n = 0;
342 }
343
344 /*
345  * Submit a general SQL command (cmd), and for each row returned,
346  *  the sqlite_handler is called with the ctx.
347  */
348 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
349 {
350    SQL_ROW row;
351
352    Dmsg0(500, "db_sql_query started\n");
353
354    db_lock(mdb);
355    if (sql_query(mdb, query) != 0) {
356       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
357       db_unlock(mdb);
358       Dmsg0(500, "db_sql_query failed\n");
359       return false;
360    }
361    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
362
363    if (result_handler != NULL) {
364       Dmsg0(500, "db_sql_query invoking handler\n");
365       if (mdb->result != NULL) {
366          int num_fields = sql_num_fields(mdb);
367
368          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
369          while ((row = sql_fetch_row(mdb)) != NULL) {
370
371             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
372             if (result_handler(ctx, num_fields, row))
373                break;
374          }
375
376         sql_free_result(mdb);
377       }
378    }
379    db_unlock(mdb);
380
381    Dmsg0(500, "db_sql_query finished\n");
382
383    return true;
384 }
385
386 /*
387  * Close database connection
388  */
389 void my_ingres_close(B_DB *mdb)
390 {
391     INGdisconnectDB(mdb->db);
392     //SRE: error handling? 
393 }
394
395 INGRES_ROW my_ingres_fetch_row(B_DB *mdb)
396 {
397    int j;
398    INGRES_ROW row = NULL; // by default, return NULL
399
400    if (!mdb->result) {
401       return row;
402    }
403    if (mdb->result->num_rows <= 0) {
404       return row;
405    }
406
407    Dmsg0(500, "my_ingres_fetch_row start\n");
408
409    if (!mdb->row || mdb->row_size < mdb->num_fields) {
410       int num_fields = mdb->num_fields;
411       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
412
413       if (mdb->row) {
414          Dmsg0(500, "my_ingres_fetch_row freeing space\n");
415          free(mdb->row);
416       }
417       num_fields += 20;                  /* add a bit extra */
418       mdb->row = (INGRES_ROW)malloc(sizeof(char *) * num_fields);
419       mdb->row_size = num_fields;
420
421       // now reset the row_number now that we have the space allocated
422       //mdb->row_number = 1;
423       mdb->row_number = 0;
424    }
425
426    // if still within the result set
427    //if (mdb->row_number <= mdb->num_rows) {
428    if (mdb->row_number < mdb->num_rows) {
429       Dmsg2(500, "my_ingres_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
430       // get each value from this row
431       for (j = 0; j < mdb->num_fields; j++) {
432          mdb->row[j] = INGgetvalue(mdb->result, mdb->row_number, j);
433          Dmsg2(500, "my_ingres_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
434       }
435       // increment the row number for the next call
436       mdb->row_number++;
437
438       row = mdb->row;
439    } else {
440       Dmsg2(500, "my_ingres_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
441    }
442
443    Dmsg1(500, "my_ingres_fetch_row finishes returning %p\n", row);
444
445    return row;
446 }
447
448
449 int my_ingres_max_length(B_DB *mdb, int field_num) {
450    //
451    // for a given column, find the max length
452    //
453    int max_length;
454    int i;
455    int this_length;
456
457    max_length = 0;
458    for (i = 0; i < mdb->num_rows; i++) {
459       if (INGgetisnull(mdb->result, i, field_num)) {
460           this_length = 4;        // "NULL"
461       } else {
462           this_length = cstrlen(INGgetvalue(mdb->result, i, field_num));
463       }
464
465       if (max_length < this_length) {
466           max_length = this_length;
467       }
468    }
469
470    return max_length;
471 }
472
473 INGRES_FIELD * my_ingres_fetch_field(B_DB *mdb)
474 {
475    int i;
476
477    Dmsg0(500, "my_ingres_fetch_field starts\n");
478
479    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
480       if (mdb->fields) {
481          free(mdb->fields);
482       }
483       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
484       mdb->fields = (INGRES_FIELD *)malloc(sizeof(INGRES_FIELD) * mdb->num_fields);
485       mdb->fields_size = mdb->num_fields;
486
487       for (i = 0; i < mdb->num_fields; i++) {
488          Dmsg1(500, "filling field %d\n", i);
489          strcpy(mdb->fields[i].name,INGfname(mdb->result, i));
490          mdb->fields[i].max_length = my_ingres_max_length(mdb, i);
491          mdb->fields[i].type       = INGftype(mdb->result, i);
492          mdb->fields[i].flags      = 0;
493
494          Dmsg4(500, "my_ingres_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
495             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
496             mdb->fields[i].flags);
497       } // end for
498    } // end if
499
500    // increment field number for the next time around
501
502    Dmsg0(500, "my_ingres_fetch_field finishes\n");
503    return &mdb->fields[mdb->field_number++];
504 }
505
506 void my_ingres_data_seek(B_DB *mdb, int row)
507 {
508    // set the row number to be returned on the next call
509    // to my_ingres_fetch_row
510    mdb->row_number = row;
511 }
512
513 void my_ingres_field_seek(B_DB *mdb, int field)
514 {
515    mdb->field_number = field;
516 }
517
518 /*
519  * Note, if this routine returns 1 (failure), Bacula expects
520  *  that no result has been stored.
521  *
522  *  Returns:  0  on success
523  *            1  on failure
524  *
525  */
526 int my_ingres_query(B_DB *mdb, const char *query)
527 {
528    Dmsg0(500, "my_ingres_query started\n");
529    // We are starting a new query.  reset everything.
530    mdb->num_rows     = -1;
531    mdb->row_number   = -1;
532    mdb->field_number = -1;
533
534    int cols = -1;
535
536    if (mdb->result) {
537       INGclear(mdb->result);  /* hmm, someone forgot to free?? */
538       mdb->result = NULL;
539    }
540
541    Dmsg1(500, "my_ingres_query starts with '%s'\n", query);
542
543    /* TODO: differentiate between SELECTs and other queries */
544
545    if ((cols = INGgetCols(query)) <= 0) {
546       if (cols < 0 ) {
547          Dmsg0(500,"my_ingres_query: neg.columns: no DML stmt!\n");
548       }
549       Dmsg0(500,"my_ingres_query (non SELECT) starting...\n");
550       /* non SELECT */
551       mdb->num_rows = INGexec(mdb->db, query);
552       if (INGcheck()) {
553         Dmsg0(500,"my_ingres_query (non SELECT) went wrong\n");
554         mdb->status = 1;
555       } else {
556         Dmsg0(500,"my_ingres_query (non SELECT) seems ok\n");
557         mdb->status = 0;
558       }
559    } else {
560       /* SELECT */
561       Dmsg0(500,"my_ingres_query (SELECT) starting...\n");
562       mdb->result = INGquery(mdb->db, query);
563       if ( mdb->result != NULL ) {
564         Dmsg1(500, "we have a result\n", query);
565
566         // how many fields in the set?
567         mdb->num_fields = (int)INGnfields(mdb->result);
568         Dmsg1(500, "we have %d fields\n", mdb->num_fields);
569
570         mdb->num_rows = INGntuples(mdb->result);
571         Dmsg1(500, "we have %d rows\n", mdb->num_rows);
572
573         mdb->status = 0;                  /* succeed */
574       } else {
575         Dmsg0(500, "No resultset...\n");
576         mdb->status = 1; /* failed */
577       }
578    }
579
580    Dmsg0(500, "my_ingres_query finishing\n");
581    return mdb->status;
582 }
583
584 void my_ingres_free_result(B_DB *mdb)
585 {
586    
587    db_lock(mdb);
588    if (mdb->result) {
589       INGclear(mdb->result);
590       mdb->result = NULL;
591    }
592
593    if (mdb->row) {
594       free(mdb->row);
595       mdb->row = NULL;
596    }
597
598    if (mdb->fields) {
599       free(mdb->fields);
600       mdb->fields = NULL;
601    }
602    db_unlock(mdb);
603 }
604
605 int my_ingres_currval(B_DB *mdb, const char *table_name)
606 {
607    // TODO!
608    return -1;
609 }
610
611 #ifdef HAVE_BATCH_FILE_INSERT
612
613 int my_ingres_batch_start(JCR *jcr, B_DB *mdb)
614 {
615     //TODO!
616    return ING_ERROR;
617 }
618
619 /* set error to something to abort operation */
620 int my_ingres_batch_end(JCR *jcr, B_DB *mdb, const char *error)
621 {
622     //TODO!
623    return ING_ERROR;
624 }
625
626 int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
627 {
628     //TODO!
629    return ING_ERROR;
630 }
631
632 #endif /* HAVE_BATCH_FILE_INSERT */
633
634 /*
635  * Escape strings so that Ingres is happy on COPY
636  *
637  *   NOTE! len is the length of the old string. Your new
638  *         string must be long enough (max 2*old+1) to hold
639  *         the escaped output.
640  */
641 char *my_ingres_copy_escape(char *dest, char *src, size_t len)
642 {
643    /* we have to escape \t, \n, \r, \ */
644    char c = '\0' ;
645
646    while (len > 0 && *src) {
647       switch (*src) {
648       case '\n':
649          c = 'n';
650          break;
651       case '\\':
652          c = '\\';
653          break;
654       case '\t':
655          c = 't';
656          break;
657       case '\r':
658          c = 'r';
659          break;
660       default:
661          c = '\0' ;
662       }
663
664       if (c) {
665          *dest = '\\';
666          dest++;
667          *dest = c;
668       } else {
669          *dest = *src;
670       }
671
672       len--;
673       src++;
674       dest++;
675    }
676
677    *dest = '\0';
678    return dest;
679 }
680
681 #ifdef HAVE_BATCH_FILE_INSERT
682 const char *my_ingres_batch_lock_path_query = 
683    "BEGIN; LOCK TABLE Path IN SHARE ROW EXCLUSIVE MODE";
684
685
686 const char *my_ingres_batch_lock_filename_query = 
687    "BEGIN; LOCK TABLE Filename IN SHARE ROW EXCLUSIVE MODE";
688
689 const char *my_ingres_batch_unlock_tables_query = "COMMIT";
690
691 const char *my_ingres_batch_fill_path_query = 
692    "INSERT INTO Path (Path) "
693     "SELECT a.Path FROM "
694      "(SELECT DISTINCT Path FROM batch) AS a "
695       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
696
697
698 const char *my_ingres_batch_fill_filename_query = 
699    "INSERT INTO Filename (Name) "
700     "SELECT a.Name FROM "
701      "(SELECT DISTINCT Name FROM batch) as a "
702       "WHERE NOT EXISTS "
703        "(SELECT Name FROM Filename WHERE Name = a.Name)";
704 #endif /* HAVE_BATCH_FILE_INSERT */
705
706 #endif /* HAVE_INGRES */