]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Tweak version date
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  * These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
38  */
39
40 #include "bacula.h"
41
42 #ifdef HAVE_INGRES
43
44 #include "cats.h"
45 #include "bdb_priv.h"
46 #include "myingres.h"
47 #include "bdb_ingres.h"
48 #include "lib/breg.h"
49
50 /* -----------------------------------------------------------------------
51  *
52  *   Ingres dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /*
58  * List of open databases.
59  */
60 static dlist *db_list = NULL;
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 struct B_DB_RWRULE {
65    int pattern_length;
66    char *search_pattern;
67    BREGEXP *rewrite_regexp;
68    bool trigger;
69 };
70
71 /*
72  * Create a new query filter.
73  */
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75                                      const char *search_pattern, const char *filter)
76 {
77    B_DB_RWRULE *rewrite_rule;
78
79    rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
80
81    rewrite_rule->pattern_length = pattern_length;
82    rewrite_rule->search_pattern = bstrdup(search_pattern);
83    rewrite_rule->rewrite_regexp = new_bregexp(filter);
84    rewrite_rule->trigger = false;
85
86    if (!rewrite_rule->rewrite_regexp) {
87       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88       free(rewrite_rule->search_pattern);
89       free(rewrite_rule);
90       return false;
91    } else {
92       query_filters->append(rewrite_rule);
93       return true;
94    }
95 }
96
97 /*
98  * Create a stack of all filters that should be applied to a SQL query
99  * before submitting it to the database backend.
100  */
101 static inline alist *db_initialize_query_filters(JCR *jcr)
102 {
103    alist *query_filters;
104
105    query_filters = New(alist(10, not_owned_by_alist));
106
107    if (!query_filters) {
108       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
109       return NULL;
110    }
111
112    db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113                             "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114    db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115                             "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116    db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117                             "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
118
119    return query_filters;
120 }
121
122 /*
123  * Free all query filters.
124  */
125 static inline void db_destroy_query_filters(alist *query_filters)
126 {
127    B_DB_RWRULE *rewrite_rule;
128
129    foreach_alist(rewrite_rule, query_filters) {
130       free_bregexp(rewrite_rule->rewrite_regexp);
131       free(rewrite_rule->search_pattern);
132       free(rewrite_rule);
133    }
134
135    delete query_filters;
136 }
137
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139                          const char *db_driver,
140                          const char *db_name,
141                          const char *db_user,
142                          const char *db_password,
143                          const char *db_address,
144                          int db_port,
145                          const char *db_socket,
146                          bool mult_db_connections,
147                          bool disable_batch_insert)
148 {
149    B_DB_INGRES *mdb;
150    int next_session_id = 0;
151
152    /*
153     * See what the next available session_id is.
154     * We first see what the highest session_id is used now.
155     */
156    if (db_list) {
157       foreach_dlist(mdb, db_list) {
158          if (mdb->m_session_id > next_session_id) {
159             next_session_id = mdb->m_session_id;
160          }
161       }
162    }
163
164    /*
165     * Initialize the parent class members.
166     */
167    m_db_interface_type  = SQL_INTERFACE_TYPE_INGRES;
168    m_db_type = SQL_TYPE_INGRES;
169    m_db_driver = bstrdup("ingres");
170    m_db_name = bstrdup(db_name);
171    m_db_user = bstrdup(db_user);
172    if (db_password) {
173       m_db_password = bstrdup(db_password);
174    }
175    if (db_address) {
176       m_db_address = bstrdup(db_address);
177    }
178    if (db_socket) {
179       m_db_socket = bstrdup(db_socket);
180    }
181    m_db_port = db_port;
182    if (disable_batch_insert) {
183       m_disabled_batch_insert = true;
184       m_have_batch_insert = false;
185    } else {
186       m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188       m_have_batch_insert = true;
189 #else
190       m_have_batch_insert = false;
191 #endif
192    }
193
194    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
195    *errmsg = 0;
196    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197    cached_path = get_pool_memory(PM_FNAME);
198    cached_path_id = 0;
199    m_ref_count = 1;
200    fname = get_pool_memory(PM_FNAME);
201    path = get_pool_memory(PM_FNAME);
202    esc_name = get_pool_memory(PM_FNAME);
203    esc_path = get_pool_memory(PM_FNAME);
204    esc_obj = get_pool_memory(PM_FNAME);
205    m_allow_transactions = mult_db_connections;
206
207    /*
208     * Initialize the private members.
209     */
210    m_db_handle = NULL;
211    m_result = NULL;
212    m_explicit_commit = true;
213    m_session_id = ++next_session_id;
214    m_query_filters = db_initialize_query_filters(jcr);
215
216    /*
217     * Put the db in the list.
218     */
219    if (db_list == NULL) {
220       db_list = New(dlist(this, &this->m_link));
221    }
222    db_list->append(this);
223 }
224
225 B_DB_INGRES::~B_DB_INGRES()
226 {
227 }
228
229 /*
230  * Now actually open the database.  This can generate errors,
231  *   which are returned in the errmsg
232  *
233  * DO NOT close the database or delete mdb here !!!!
234  */
235 bool B_DB_INGRES::db_open_database(JCR *jcr)
236 {
237    bool retval = false;
238    int errstat;
239
240    P(mutex);
241    if (m_connected) {
242       retval = true;
243       goto bail_out;
244    }
245
246    if ((errstat=rwl_init(&m_lock)) != 0) {
247       berrno be;
248       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
249             be.bstrerror(errstat));
250       goto bail_out;
251    }
252
253    m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
254
255    Dmsg0(50, "Ingres real CONNECT done\n");
256    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
257               m_db_password == NULL ? "(NULL)" : m_db_password);
258
259    if (!m_db_handle) {
260       Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
261             "Database=%s User=%s\n"
262             "It is probably not running or your password is incorrect.\n"),
263              m_db_name, m_db_user);
264       goto bail_out;
265    }
266
267    m_connected = true;
268
269    INGsetDefaultLockingMode(m_db_handle);
270
271    if (!check_tables_version(jcr, this)) {
272       goto bail_out;
273    }
274
275    retval = true;
276
277 bail_out:
278    V(mutex);
279    return retval;
280 }
281
282 void B_DB_INGRES::db_close_database(JCR *jcr)
283 {
284    db_end_transaction(jcr);
285    P(mutex);
286    m_ref_count--;
287    if (m_ref_count == 0) {
288       sql_free_result();
289       db_list->remove(this);
290       if (m_connected && m_db_handle) {
291          INGdisconnectDB(m_db_handle);
292       }
293       if (m_query_filters) {
294          db_destroy_query_filters(m_query_filters);
295       }
296       rwl_destroy(&m_lock);
297       free_pool_memory(errmsg);
298       free_pool_memory(cmd);
299       free_pool_memory(cached_path);
300       free_pool_memory(fname);
301       free_pool_memory(path);
302       free_pool_memory(esc_name);
303       free_pool_memory(esc_path);
304       free_pool_memory(esc_obj);
305       free(m_db_driver);
306       free(m_db_name);
307       free(m_db_user);
308       if (m_db_password) {
309          free(m_db_password);
310       }
311       if (m_db_address) {
312          free(m_db_address);
313       }
314       if (m_db_socket) {
315          free(m_db_socket);
316       }
317       delete this;
318       if (db_list->size() == 0) {
319          delete db_list;
320          db_list = NULL;
321       }
322    }
323    V(mutex);
324 }
325
326 void B_DB_INGRES::db_thread_cleanup(void)
327 {
328 }
329
330 /*
331  * Escape strings so that Ingres is happy
332  *
333  *   NOTE! len is the length of the old string. Your new
334  *         string must be long enough (max 2*old+1) to hold
335  *         the escaped output.
336  */
337 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
338 {
339    char *n, *o;
340
341    n = snew;
342    o = old;
343    while (len--) {
344       switch (*o) {
345       case '\'':
346          *n++ = '\'';
347          *n++ = '\'';
348          o++;
349          break;
350       case 0:
351          *n++ = '\\';
352          *n++ = 0;
353          o++;
354          break;
355       default:
356          *n++ = *o++;
357          break;
358       }
359    }
360    *n = 0;
361 }
362
363 /*
364  * Escape binary so that Ingres is happy
365  *
366  *   NOTE! Need to be implemented (escape \0)
367  *
368  */
369 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
370 {
371    char *n, *o;
372
373    n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
374    o = old;
375    while (len--) {
376       switch (*o) {
377       case '\'':
378          *n++ = '\'';
379          *n++ = '\'';
380          o++;
381          break;
382       case 0:
383          *n++ = '\\';
384          *n++ = 0;
385          o++;
386          break;
387       default:
388          *n++ = *o++;
389          break;
390       }
391    }
392    *n = 0;
393    return esc_obj;
394 }
395
396 /*
397  * Unescape binary object so that Ingres is happy
398  *
399  * TODO: need to be implemented (escape \0)
400  */
401 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
402                                      POOLMEM **dest, int32_t *dest_len)
403 {
404    if (!from) {
405       *dest[0] = 0;
406       *dest_len = 0;
407       return;
408    }
409    *dest = check_pool_memory_size(*dest, expected_len+1);
410    *dest_len = expected_len;
411    memcpy(*dest, from, expected_len);
412    (*dest)[expected_len]=0;
413 }
414
415 /*
416  * Start a transaction. This groups inserts and makes things
417  * much more efficient. Usually started when inserting
418  * file attributes.
419  */
420 void B_DB_INGRES::db_start_transaction(JCR *jcr)
421 {
422    if (!jcr->attr) {
423       jcr->attr = get_pool_memory(PM_FNAME);
424    }
425    if (!jcr->ar) {
426       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
427    }
428
429    if (!m_allow_transactions) {
430       return;
431    }
432
433    db_lock(this);
434    /* Allow only 25,000 changes per transaction */
435    if (m_transaction && changes > 25000) {
436       db_end_transaction(jcr);
437    }
438    if (!m_transaction) {
439       sql_query("BEGIN");  /* begin transaction */
440       Dmsg0(400, "Start Ingres transaction\n");
441       m_transaction = true;
442    }
443    db_unlock(this);
444 }
445
446 void B_DB_INGRES::db_end_transaction(JCR *jcr)
447 {
448    if (jcr && jcr->cached_attribute) {
449       Dmsg0(400, "Flush last cached attribute.\n");
450       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
451          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
452       }
453       jcr->cached_attribute = false;
454    }
455
456    if (!m_allow_transactions) {
457       return;
458    }
459
460    db_lock(this);
461    if (m_transaction) {
462       sql_query("COMMIT"); /* end transaction */
463       m_transaction = false;
464       Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
465    }
466    changes = 0;
467    db_unlock(this);
468 }
469
470 /*
471  * Submit a general SQL command (cmd), and for each row returned,
472  *  the result_handler is called with the ctx.
473  */
474 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
475 {
476    SQL_ROW row;
477    bool retval = true;
478
479    Dmsg1(500, "db_sql_query starts with %s\n", query);
480
481    db_lock(this);
482    if (!sql_query(query, QF_STORE_RESULT)) {
483       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
484       Dmsg0(500, "db_sql_query failed\n");
485       retval = false;
486       goto bail_out;
487    }
488
489    if (result_handler != NULL) {
490       Dmsg0(500, "db_sql_query invoking handler\n");
491       while ((row = sql_fetch_row()) != NULL) {
492          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
493          if (result_handler(ctx, m_num_fields, row))
494             break;
495       }
496       sql_free_result();
497    }
498
499    Dmsg0(500, "db_sql_query finished\n");
500
501 bail_out:
502    db_unlock(this);
503    return retval;
504 }
505
506 /*
507  * Note, if this routine returns false (failure), Bacula expects
508  * that no result has been stored.
509  *
510  *  Returns:  true  on success
511  *            false on failure
512  *
513  */
514 bool B_DB_INGRES::sql_query(const char *query, int flags)
515 {
516    int cols;
517    char *cp, *bp;
518    char *dup_query, *new_query;
519    bool retval = true;
520    bool start_of_transaction = false;
521    bool end_of_transaction = false;
522    B_DB_RWRULE *rewrite_rule;
523
524    Dmsg1(500, "query starts with '%s'\n", query);
525    /*
526     * We always make a private copy of the query as we are doing serious
527     * rewrites in this engine. When running the private copy through the
528     * different query filters we loose the orginal private copy so we
529     * first make a extra reference to it so we can free it on exit from the
530     * function.
531     */
532    dup_query = new_query = bstrdup(query);
533
534    /*
535     * Iterate over the query string and perform any needed operations.
536     * We use a sliding window over the query string where bp points to
537     * the previous position in the query and cp to the current position
538     * in the query.
539     */
540    bp = new_query;
541    while (bp != NULL) {
542       if ((cp = strchr(bp, ' ')) != NULL) {
543          *cp++;
544       }
545
546       if (!strncasecmp(bp, "BEGIN", 5)) {
547          /*
548           * This is the start of a transaction.
549           * Inline copy the rest of the query over the BEGIN keyword.
550           */
551          if (cp) {
552             strcpy(bp, cp);
553          } else {
554             *bp = '\0';
555          }
556          start_of_transaction = true;
557       } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
558          /*
559           * This is the end of a transaction. We cannot check for just the COMMIT
560           * keyword as a DECLARE of an tempory table also has the word COMMIT in it
561           * but its followed by the word PRESERVE.
562           * Inline copy the rest of the query over the COMMIT keyword.
563           */
564          if (cp) {
565             strcpy(bp, cp);
566          } else {
567             *bp = '\0';
568          }
569          end_of_transaction = true;
570       }
571
572       /*
573        * See what query filter might match.
574        */
575       foreach_alist(rewrite_rule, m_query_filters) {
576          if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
577             rewrite_rule->trigger = true;
578          }
579       }
580
581       /*
582        * Slide window.
583        */
584       bp = cp;
585    }
586
587    /*
588     * Run the query through all query filters that apply e.g. have the trigger set in the
589     * previous loop.
590     */
591    foreach_alist(rewrite_rule, m_query_filters) {
592       if (rewrite_rule->trigger) {
593          new_query = rewrite_rule->rewrite_regexp->replace(new_query);
594          rewrite_rule->trigger = false;
595       }
596    }
597
598    if (start_of_transaction) {
599       Dmsg0(500,"sql_query: Start of transaction\n");
600       m_explicit_commit = false;
601    }
602
603    /*
604     * See if there is any query left after filtering for certain keywords.
605     */
606    bp = new_query;
607    while (bp != NULL && strlen(bp) > 0) {
608       /*
609        * We are starting a new query.  reset everything.
610        */
611       m_num_rows     = -1;
612       m_row_number   = -1;
613       m_field_number = -1;
614
615       if (m_result) {
616          INGclear(m_result);  /* hmm, someone forgot to free?? */
617          m_result = NULL;
618       }
619
620       /*
621        * See if this is a multi-statement query. We split a multi-statement query
622        * on the semi-column and feed the individual queries to the Ingres functions.
623        * We use a sliding window over the query string where bp points to
624        * the previous position in the query and cp to the current position
625        * in the query.
626        */
627       if ((cp = strchr(bp, ';')) != NULL) {
628          *cp++ = '\0';
629       }
630
631       Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
632
633       /*
634        * See if we got a store_result hint which could mean we are running a select.
635        * If flags has QF_STORE_RESULT not set we are sure its not a query that we
636        * need to store anything for.
637        */
638       if (flags & QF_STORE_RESULT) {
639          cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
640       } else {
641          cols = 0;
642       }
643
644       if (cols <= 0) {
645          if (cols < 0 ) {
646             Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
647             retval = false;
648             goto bail_out;
649          }
650          Dmsg0(500,"sql_query (non SELECT) starting...\n");
651          /*
652           * non SELECT
653           */
654          m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
655          if (m_num_rows == -1) {
656            Dmsg0(500,"sql_query (non SELECT) went wrong\n");
657            retval = false;
658            goto bail_out;
659          } else {
660            Dmsg0(500,"sql_query (non SELECT) seems ok\n");
661          }
662       } else {
663          /*
664           * SELECT
665           */
666          Dmsg0(500,"sql_query (SELECT) starting...\n");
667          m_result = INGquery(m_db_handle, bp, m_explicit_commit);
668          if (m_result != NULL) {
669             Dmsg0(500, "we have a result\n");
670
671             /*
672              * How many fields in the set?
673              */
674             m_num_fields = (int)INGnfields(m_result);
675             Dmsg1(500, "we have %d fields\n", m_num_fields);
676
677             m_num_rows = INGntuples(m_result);
678             Dmsg1(500, "we have %d rows\n", m_num_rows);
679          } else {
680             Dmsg0(500, "No resultset...\n");
681             retval = false;
682             goto bail_out;
683          }
684       }
685
686       bp = cp;
687    }
688
689 bail_out:
690    if (end_of_transaction) {
691       Dmsg0(500,"sql_query: End of transaction, commiting work\n");
692       m_explicit_commit = true;
693       INGcommit(m_db_handle);
694    }
695
696    free(dup_query);
697    Dmsg0(500, "sql_query finishing\n");
698
699    return retval;
700 }  
701
702 void B_DB_INGRES::sql_free_result(void)
703 {
704    db_lock(this);
705    if (m_result) {
706       INGclear(m_result);
707       m_result = NULL;
708    }
709    if (m_rows) {
710       free(m_rows);
711       m_rows = NULL;
712    }
713    if (m_fields) {
714       free(m_fields);
715       m_fields = NULL;
716    }
717    m_num_rows = m_num_fields = 0;
718    db_unlock(this);
719 }
720
721 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
722 {
723    int j;
724    SQL_ROW row = NULL; /* by default, return NULL */
725
726    if (!m_result) {
727       return row;
728    }
729    if (m_result->num_rows <= 0) {
730       return row;
731    }
732
733    Dmsg0(500, "sql_fetch_row start\n");
734
735    if (!m_rows || m_rows_size < m_num_fields) {
736       if (m_rows) {
737          Dmsg0(500, "sql_fetch_row freeing space\n");
738          free(m_rows);
739       }
740       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
741       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
742       m_rows_size = m_num_fields;
743
744       /*
745        * Now reset the row_number now that we have the space allocated
746        */
747       m_row_number = 0;
748    }
749
750    /*
751     * If still within the result set
752     */
753    if (m_row_number < m_num_rows) {
754       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
755       /*
756        * Get each value from this row
757        */
758       for (j = 0; j < m_num_fields; j++) {
759          m_rows[j] = INGgetvalue(m_result, m_row_number, j);
760          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
761       }
762       /*
763        * Increment the row number for the next call
764        */
765       m_row_number++;
766
767       row = m_rows;
768    } else {
769       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
770    }
771
772    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
773
774    return row;
775 }
776
777 const char *B_DB_INGRES::sql_strerror(void)
778 {
779    return INGerrorMessage(m_db_handle);
780 }
781
782 void B_DB_INGRES::sql_data_seek(int row)
783 {
784    /*
785     * Set the row number to be returned on the next call to sql_fetch_row
786     */
787    m_row_number = row;
788 }
789
790 int B_DB_INGRES::sql_affected_rows(void)
791 {
792    return m_num_rows;
793 }
794
795 /*
796  * First execute the insert query and then retrieve the currval.
797  * By setting transaction to true we make it an atomic transaction
798  * and as such we can get the currval after which we commit if
799  * transaction is false. This way things are an atomic operation
800  * for Ingres and things work. We save the current transaction status
801  * and set transaction in the mdb to true and at the end of this
802  * function we restore the actual transaction status.
803  */
804 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
805 {
806    char sequence[64];
807    char getkeyval_query[256];
808    char *currval;
809    uint64_t id = 0;
810    bool current_explicit_commit;
811
812    /*
813     * Save the current transaction status and pretend we are in a transaction.
814     */
815    current_explicit_commit = m_explicit_commit;
816    m_explicit_commit = false;
817
818    /*
819     * Execute the INSERT query.
820     */
821    m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
822    if (m_num_rows == -1) {
823       goto bail_out;
824    }
825
826    changes++;
827
828    /*
829     * Obtain the current value of the sequence that
830     * provides the serial value for primary key of the table.
831     *
832     * currval is local to our session. It is not affected by
833     * other transactions.
834     *
835     * Determine the name of the sequence.
836     * As we name all sequences as <table>_seq this is easy.
837     */
838    bstrncpy(sequence, table_name, sizeof(sequence));
839    bstrncat(sequence, "_seq", sizeof(sequence));
840
841    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
842
843    if (m_result) {
844       INGclear(m_result);
845       m_result = NULL;
846    }
847    m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
848
849    if (!m_result) {
850       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
851       goto bail_out;
852    }
853
854    Dmsg0(500, "exec done");
855
856    currval = INGgetvalue(m_result, 0, 0);
857    if (currval) {
858       id = str_to_uint64(currval);
859    }
860
861    INGclear(m_result);
862    m_result = NULL;
863
864 bail_out:
865    /*
866     * Restore the actual explicit_commit status.
867     */
868    m_explicit_commit = current_explicit_commit;
869
870    /*
871     * Commit if explicit_commit is not set.
872     */
873    if (m_explicit_commit) {
874       INGcommit(m_db_handle);
875    }
876
877    return id;
878 }
879
880 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
881 {
882    int i, j;
883    int max_length;
884    int this_length;
885
886    if (!m_fields || m_fields_size < m_num_fields) {
887       if (m_fields) {
888          free(m_fields);
889          m_fields = NULL;
890       }
891       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
892       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
893       m_fields_size = m_num_fields;
894
895       for (i = 0; i < m_num_fields; i++) {
896          Dmsg1(500, "filling field %d\n", i);
897          m_fields[i].name = INGfname(m_result, i);
898          m_fields[i].type = INGftype(m_result, i);
899          m_fields[i].flags = 0;
900
901          /*
902           * For a given column, find the max length.
903           */
904          max_length = 0;
905          for (j = 0; j < m_num_rows; j++) {
906             if (INGgetisnull(m_result, j, i)) {
907                 this_length = 4;        /* "NULL" */
908             } else {
909                 this_length = cstrlen(INGgetvalue(m_result, j, i));
910             }
911
912             if (max_length < this_length) {
913                max_length = this_length;
914             }
915          }
916          m_fields[i].max_length = max_length;
917
918          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
919                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
920       }
921    }
922
923    /*
924     * Increment field number for the next time around
925     */
926    return &m_fields[m_field_number++];
927 }
928
929 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
930 {
931    switch (field_type) {
932    case 1:
933       return true;
934    default:
935       return false;
936    }
937 }
938
939 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
940 {
941    /*
942     * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
943     */
944    switch (field_type) {
945    case IISQ_DEC_TYPE:
946    case IISQ_INT_TYPE:
947    case IISQ_FLT_TYPE:
948       return true;
949    default:
950       return false;
951    }
952 }
953
954 /*
955  * Escape strings so that Ingres is happy on COPY
956  *
957  *   NOTE! len is the length of the old string. Your new
958  *         string must be long enough (max 2*old+1) to hold
959  *         the escaped output.
960  */
961 static char *ingres_copy_escape(char *dest, char *src, size_t len)
962 {
963    /* we have to escape \t, \n, \r, \ */
964    char c = '\0' ;
965
966    while (len > 0 && *src) {
967       switch (*src) {
968       case '\n':
969          c = 'n';
970          break;
971       case '\\':
972          c = '\\';
973          break;
974       case '\t':
975          c = 't';
976          break;
977       case '\r':
978          c = 'r';
979          break;
980       default:
981          c = '\0' ;
982       }
983
984       if (c) {
985          *dest = '\\';
986          dest++;
987          *dest = c;
988       } else {
989          *dest = *src;
990       }
991
992       len--;
993       src++;
994       dest++;
995    }
996
997    *dest = '\0';
998    return dest;
999 }
1000
1001 /* 
1002  * Returns true if OK
1003  *         false if failed
1004  */
1005 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1006 {
1007    bool ok;
1008
1009    db_lock(this);
1010    ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1011                            "FileIndex INTEGER,"
1012                            "JobId INTEGER,"
1013                            "Path VARBYTE(32000),"
1014                            "Name VARBYTE(32000),"
1015                            "LStat VARBYTE(255),"
1016                            "MD5 VARBYTE(255),"
1017                            "DeltaSeq SMALLINT)"
1018                            " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1019    db_unlock(this);
1020    return ok;
1021 }
1022
1023 /* 
1024  * Returns true if OK
1025  *         false if failed
1026  */
1027 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1028 {
1029    m_status = 0;
1030    return true;
1031 }
1032
1033 /* 
1034  * Returns true if OK
1035  *         false if failed
1036  */
1037 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1038 {
1039    size_t len;
1040    const char *digest;
1041    char ed1[50];
1042
1043    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1044    db_escape_string(jcr, esc_name, fname, fnl);
1045
1046    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1047    db_escape_string(jcr, esc_path, path, pnl);
1048
1049    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1050       digest = "0";
1051    } else {
1052       digest = ar->Digest;
1053    }
1054
1055    len = Mmsg(cmd, "INSERT INTO batch VALUES "
1056                    "(%u,%s,'%s','%s','%s','%s',%u)",
1057                    ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1058                    esc_name, ar->attr, digest, ar->DeltaSeq);
1059
1060    return sql_query(cmd);
1061 }
1062
1063 /*
1064  * Initialize database data structure. In principal this should
1065  * never have errors, or it is really fatal.
1066  */
1067 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1068                        const char *db_password, const char *db_address, int db_port,
1069                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1070 {
1071    B_DB_INGRES *mdb = NULL;
1072
1073    if (!db_user) {
1074       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1075       return NULL;
1076    }
1077
1078    P(mutex);                          /* lock DB queue */
1079    if (db_list && !mult_db_connections) {
1080       /*
1081        * Look to see if DB already open
1082        */
1083       foreach_dlist(mdb, db_list) {
1084          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1085             Dmsg1(100, "DB REopen %s\n", db_name);
1086             mdb->increment_refcount();
1087             goto bail_out;
1088          }
1089       }
1090    }
1091
1092    Dmsg0(100, "db_init_database first time\n");
1093    mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1094                          db_port, db_socket, mult_db_connections, disable_batch_insert));
1095
1096 bail_out:
1097    V(mutex);
1098    return mdb;
1099 }
1100 #endif /* HAVE_INGRES */