]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Tweak updates
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  * These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
38  */
39
40 #include "bacula.h"
41
42 #ifdef HAVE_INGRES
43
44 #include "cats.h"
45 #include "bdb_priv.h"
46 #include "myingres.h"
47 #include "bdb_ingres.h"
48 #include "lib/breg.h"
49
50 /* -----------------------------------------------------------------------
51  *
52  *   Ingres dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /*
58  * List of open databases.
59  */
60 static dlist *db_list = NULL;
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 struct B_DB_RWRULE {
65    int pattern_length;
66    char *search_pattern;
67    BREGEXP *rewrite_regexp;
68    bool trigger;
69 };
70
71 /*
72  * Create a new query filter.
73  */
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75                                      const char *search_pattern, const char *filter)
76 {
77    B_DB_RWRULE *rewrite_rule;
78
79    rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
80
81    rewrite_rule->pattern_length = pattern_length;
82    rewrite_rule->search_pattern = bstrdup(search_pattern);
83    rewrite_rule->rewrite_regexp = new_bregexp(filter);
84    rewrite_rule->trigger = false;
85
86    if (!rewrite_rule->rewrite_regexp) {
87       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88       free(rewrite_rule->search_pattern);
89       free(rewrite_rule);
90       return false;
91    } else {
92       query_filters->append(rewrite_rule);
93       return true;
94    }
95 }
96
97 /*
98  * Create a stack of all filters that should be applied to a SQL query
99  * before submitting it to the database backend.
100  */
101 static inline alist *db_initialize_query_filters(JCR *jcr)
102 {
103    alist *query_filters;
104
105    query_filters = New(alist(10, not_owned_by_alist));
106
107    if (!query_filters) {
108       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
109       return NULL;
110    }
111
112    db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113                             "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114    db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115                             "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116    db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117                             "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
118
119    return query_filters;
120 }
121
122 /*
123  * Free all query filters.
124  */
125 static inline void db_destroy_query_filters(alist *query_filters)
126 {
127    B_DB_RWRULE *rewrite_rule;
128
129    foreach_alist(rewrite_rule, query_filters) {
130       free_bregexp(rewrite_rule->rewrite_regexp);
131       free(rewrite_rule->search_pattern);
132       free(rewrite_rule);
133    }
134
135    delete query_filters;
136 }
137
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139                          const char *db_driver,
140                          const char *db_name,
141                          const char *db_user,
142                          const char *db_password,
143                          const char *db_address,
144                          int db_port,
145                          const char *db_socket,
146                          bool mult_db_connections,
147                          bool disable_batch_insert)
148 {
149    B_DB_INGRES *mdb;
150    int next_session_id = 0;
151
152    /*
153     * See what the next available session_id is.
154     * We first see what the highest session_id is used now.
155     */
156    if (db_list) {
157       foreach_dlist(mdb, db_list) {
158          if (mdb->m_session_id > next_session_id) {
159             next_session_id = mdb->m_session_id;
160          }
161       }
162    }
163
164    /*
165     * Initialize the parent class members.
166     */
167    m_db_interface_type  = SQL_INTERFACE_TYPE_INGRES;
168    m_db_type = SQL_TYPE_INGRES;
169    m_db_driver = bstrdup("ingres");
170    m_db_name = bstrdup(db_name);
171    m_db_user = bstrdup(db_user);
172    if (db_password) {
173       m_db_password = bstrdup(db_password);
174    }
175    if (db_address) {
176       m_db_address = bstrdup(db_address);
177    }
178    if (db_socket) {
179       m_db_socket = bstrdup(db_socket);
180    }
181    m_db_port = db_port;
182    if (disable_batch_insert) {
183       m_disabled_batch_insert = true;
184       m_have_batch_insert = false;
185    } else {
186       m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188       m_have_batch_insert = true;
189 #else
190       m_have_batch_insert = false;
191 #endif
192    }
193
194    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
195    *errmsg = 0;
196    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197    cached_path = get_pool_memory(PM_FNAME);
198    cached_path_id = 0;
199    m_ref_count = 1;
200    fname = get_pool_memory(PM_FNAME);
201    path = get_pool_memory(PM_FNAME);
202    esc_name = get_pool_memory(PM_FNAME);
203    esc_path = get_pool_memory(PM_FNAME);
204    esc_obj = get_pool_memory(PM_FNAME);
205    m_allow_transactions = mult_db_connections;
206
207    /* At this time, when mult_db_connections == true, this is for 
208     * specific console command such as bvfs or batch mode, and we don't
209     * want to share a batch mode or bvfs. In the future, we can change
210     * the creation function to add this parameter.
211     */
212    m_dedicated = mult_db_connections; 
213
214    /*
215     * Initialize the private members.
216     */
217    m_db_handle = NULL;
218    m_result = NULL;
219    m_explicit_commit = true;
220    m_session_id = ++next_session_id;
221    m_query_filters = db_initialize_query_filters(jcr);
222
223    /*
224     * Put the db in the list.
225     */
226    if (db_list == NULL) {
227       db_list = New(dlist(this, &this->m_link));
228    }
229    db_list->append(this);
230 }
231
232 B_DB_INGRES::~B_DB_INGRES()
233 {
234 }
235
236 /*
237  * Now actually open the database.  This can generate errors,
238  *   which are returned in the errmsg
239  *
240  * DO NOT close the database or delete mdb here !!!!
241  */
242 bool B_DB_INGRES::db_open_database(JCR *jcr)
243 {
244    bool retval = false;
245    int errstat;
246
247    P(mutex);
248    if (m_connected) {
249       retval = true;
250       goto bail_out;
251    }
252
253    if ((errstat=rwl_init(&m_lock)) != 0) {
254       berrno be;
255       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
256             be.bstrerror(errstat));
257       goto bail_out;
258    }
259
260    m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
261
262    Dmsg0(50, "Ingres real CONNECT done\n");
263    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
264               m_db_password == NULL ? "(NULL)" : m_db_password);
265
266    if (!m_db_handle) {
267       Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
268             "Database=%s User=%s\n"
269             "It is probably not running or your password is incorrect.\n"),
270              m_db_name, m_db_user);
271       goto bail_out;
272    }
273
274    m_connected = true;
275
276    INGsetDefaultLockingMode(m_db_handle);
277
278    if (!check_tables_version(jcr, this)) {
279       goto bail_out;
280    }
281
282    retval = true;
283
284 bail_out:
285    V(mutex);
286    return retval;
287 }
288
289 void B_DB_INGRES::db_close_database(JCR *jcr)
290 {
291    if (m_connected) {
292       db_end_transaction(jcr);
293    }
294    P(mutex);
295    m_ref_count--;
296    if (m_ref_count == 0) {
297       if (m_connected) {
298          sql_free_result();
299       }
300       db_list->remove(this);
301       if (m_connected && m_db_handle) {
302          INGdisconnectDB(m_db_handle);
303       }
304       if (m_query_filters) {
305          db_destroy_query_filters(m_query_filters);
306       }
307       if (rwl_is_init(&m_lock)) {
308          rwl_destroy(&m_lock);
309       }
310       free_pool_memory(errmsg);
311       free_pool_memory(cmd);
312       free_pool_memory(cached_path);
313       free_pool_memory(fname);
314       free_pool_memory(path);
315       free_pool_memory(esc_name);
316       free_pool_memory(esc_path);
317       free_pool_memory(esc_obj);
318       free(m_db_driver);
319       free(m_db_name);
320       free(m_db_user);
321       if (m_db_password) {
322          free(m_db_password);
323       }
324       if (m_db_address) {
325          free(m_db_address);
326       }
327       if (m_db_socket) {
328          free(m_db_socket);
329       }
330       delete this;
331       if (db_list->size() == 0) {
332          delete db_list;
333          db_list = NULL;
334       }
335    }
336    V(mutex);
337 }
338
339 void B_DB_INGRES::db_thread_cleanup(void)
340 {
341 }
342
343 /*
344  * Escape strings so that Ingres is happy
345  *
346  *   NOTE! len is the length of the old string. Your new
347  *         string must be long enough (max 2*old+1) to hold
348  *         the escaped output.
349  */
350 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
351 {
352    char *n, *o;
353
354    n = snew;
355    o = old;
356    while (len--) {
357       switch (*o) {
358       case '\'':
359          *n++ = '\'';
360          *n++ = '\'';
361          o++;
362          break;
363       case 0:
364          *n++ = '\\';
365          *n++ = 0;
366          o++;
367          break;
368       default:
369          *n++ = *o++;
370          break;
371       }
372    }
373    *n = 0;
374 }
375
376 /*
377  * Escape binary so that Ingres is happy
378  *
379  *   NOTE! Need to be implemented (escape \0)
380  *
381  */
382 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
383 {
384    char *n, *o;
385
386    n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
387    o = old;
388    while (len--) {
389       switch (*o) {
390       case '\'':
391          *n++ = '\'';
392          *n++ = '\'';
393          o++;
394          break;
395       case 0:
396          *n++ = '\\';
397          *n++ = 0;
398          o++;
399          break;
400       default:
401          *n++ = *o++;
402          break;
403       }
404    }
405    *n = 0;
406    return esc_obj;
407 }
408
409 /*
410  * Unescape binary object so that Ingres is happy
411  *
412  * TODO: need to be implemented (escape \0)
413  */
414 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
415                                      POOLMEM **dest, int32_t *dest_len)
416 {
417    if (!from) {
418       *dest[0] = 0;
419       *dest_len = 0;
420       return;
421    }
422    *dest = check_pool_memory_size(*dest, expected_len+1);
423    *dest_len = expected_len;
424    memcpy(*dest, from, expected_len);
425    (*dest)[expected_len]=0;
426 }
427
428 /*
429  * Start a transaction. This groups inserts and makes things
430  * much more efficient. Usually started when inserting
431  * file attributes.
432  */
433 void B_DB_INGRES::db_start_transaction(JCR *jcr)
434 {
435    if (!jcr->attr) {
436       jcr->attr = get_pool_memory(PM_FNAME);
437    }
438    if (!jcr->ar) {
439       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
440    }
441
442    if (!m_allow_transactions) {
443       return;
444    }
445
446    db_lock(this);
447    /* Allow only 25,000 changes per transaction */
448    if (m_transaction && changes > 25000) {
449       db_end_transaction(jcr);
450    }
451    if (!m_transaction) {
452       sql_query("BEGIN");  /* begin transaction */
453       Dmsg0(400, "Start Ingres transaction\n");
454       m_transaction = true;
455    }
456    db_unlock(this);
457 }
458
459 void B_DB_INGRES::db_end_transaction(JCR *jcr)
460 {
461    if (jcr && jcr->cached_attribute) {
462       Dmsg0(400, "Flush last cached attribute.\n");
463       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
464          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
465       }
466       jcr->cached_attribute = false;
467    }
468
469    if (!m_allow_transactions) {
470       return;
471    }
472
473    db_lock(this);
474    if (m_transaction) {
475       sql_query("COMMIT"); /* end transaction */
476       m_transaction = false;
477       Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
478    }
479    changes = 0;
480    db_unlock(this);
481 }
482
483 /*
484  * Submit a general SQL command (cmd), and for each row returned,
485  *  the result_handler is called with the ctx.
486  */
487 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
488 {
489    SQL_ROW row;
490    bool retval = true;
491
492    Dmsg1(500, "db_sql_query starts with %s\n", query);
493
494    db_lock(this);
495    if (!sql_query(query, QF_STORE_RESULT)) {
496       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
497       Dmsg0(500, "db_sql_query failed\n");
498       retval = false;
499       goto bail_out;
500    }
501
502    if (result_handler != NULL) {
503       Dmsg0(500, "db_sql_query invoking handler\n");
504       while ((row = sql_fetch_row()) != NULL) {
505          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
506          if (result_handler(ctx, m_num_fields, row))
507             break;
508       }
509       sql_free_result();
510    }
511
512    Dmsg0(500, "db_sql_query finished\n");
513
514 bail_out:
515    db_unlock(this);
516    return retval;
517 }
518
519 /*
520  * Note, if this routine returns false (failure), Bacula expects
521  * that no result has been stored.
522  *
523  *  Returns:  true  on success
524  *            false on failure
525  *
526  */
527 bool B_DB_INGRES::sql_query(const char *query, int flags)
528 {
529    int cols;
530    char *cp, *bp;
531    char *dup_query, *new_query;
532    bool retval = true;
533    bool start_of_transaction = false;
534    bool end_of_transaction = false;
535    B_DB_RWRULE *rewrite_rule;
536
537    Dmsg1(500, "query starts with '%s'\n", query);
538    /*
539     * We always make a private copy of the query as we are doing serious
540     * rewrites in this engine. When running the private copy through the
541     * different query filters we loose the orginal private copy so we
542     * first make a extra reference to it so we can free it on exit from the
543     * function.
544     */
545    dup_query = new_query = bstrdup(query);
546
547    /*
548     * Iterate over the query string and perform any needed operations.
549     * We use a sliding window over the query string where bp points to
550     * the previous position in the query and cp to the current position
551     * in the query.
552     */
553    bp = new_query;
554    while (bp != NULL) {
555       if ((cp = strchr(bp, ' ')) != NULL) {
556          *cp++;
557       }
558
559       if (!strncasecmp(bp, "BEGIN", 5)) {
560          /*
561           * This is the start of a transaction.
562           * Inline copy the rest of the query over the BEGIN keyword.
563           */
564          if (cp) {
565             strcpy(bp, cp);
566          } else {
567             *bp = '\0';
568          }
569          start_of_transaction = true;
570       } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
571          /*
572           * This is the end of a transaction. We cannot check for just the COMMIT
573           * keyword as a DECLARE of an tempory table also has the word COMMIT in it
574           * but its followed by the word PRESERVE.
575           * Inline copy the rest of the query over the COMMIT keyword.
576           */
577          if (cp) {
578             strcpy(bp, cp);
579          } else {
580             *bp = '\0';
581          }
582          end_of_transaction = true;
583       }
584
585       /*
586        * See what query filter might match.
587        */
588       foreach_alist(rewrite_rule, m_query_filters) {
589          if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
590             rewrite_rule->trigger = true;
591          }
592       }
593
594       /*
595        * Slide window.
596        */
597       bp = cp;
598    }
599
600    /*
601     * Run the query through all query filters that apply e.g. have the trigger set in the
602     * previous loop.
603     */
604    foreach_alist(rewrite_rule, m_query_filters) {
605       if (rewrite_rule->trigger) {
606          new_query = rewrite_rule->rewrite_regexp->replace(new_query);
607          rewrite_rule->trigger = false;
608       }
609    }
610
611    if (start_of_transaction) {
612       Dmsg0(500,"sql_query: Start of transaction\n");
613       m_explicit_commit = false;
614    }
615
616    /*
617     * See if there is any query left after filtering for certain keywords.
618     */
619    bp = new_query;
620    while (bp != NULL && strlen(bp) > 0) {
621       /*
622        * We are starting a new query.  reset everything.
623        */
624       m_num_rows     = -1;
625       m_row_number   = -1;
626       m_field_number = -1;
627
628       if (m_result) {
629          INGclear(m_result);  /* hmm, someone forgot to free?? */
630          m_result = NULL;
631       }
632
633       /*
634        * See if this is a multi-statement query. We split a multi-statement query
635        * on the semi-column and feed the individual queries to the Ingres functions.
636        * We use a sliding window over the query string where bp points to
637        * the previous position in the query and cp to the current position
638        * in the query.
639        */
640       if ((cp = strchr(bp, ';')) != NULL) {
641          *cp++ = '\0';
642       }
643
644       Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
645
646       /*
647        * See if we got a store_result hint which could mean we are running a select.
648        * If flags has QF_STORE_RESULT not set we are sure its not a query that we
649        * need to store anything for.
650        */
651       if (flags & QF_STORE_RESULT) {
652          cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
653       } else {
654          cols = 0;
655       }
656
657       if (cols <= 0) {
658          if (cols < 0 ) {
659             Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
660             retval = false;
661             goto bail_out;
662          }
663          Dmsg0(500,"sql_query (non SELECT) starting...\n");
664          /*
665           * non SELECT
666           */
667          m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
668          if (m_num_rows == -1) {
669            Dmsg0(500,"sql_query (non SELECT) went wrong\n");
670            retval = false;
671            goto bail_out;
672          } else {
673            Dmsg0(500,"sql_query (non SELECT) seems ok\n");
674          }
675       } else {
676          /*
677           * SELECT
678           */
679          Dmsg0(500,"sql_query (SELECT) starting...\n");
680          m_result = INGquery(m_db_handle, bp, m_explicit_commit);
681          if (m_result != NULL) {
682             Dmsg0(500, "we have a result\n");
683
684             /*
685              * How many fields in the set?
686              */
687             m_num_fields = (int)INGnfields(m_result);
688             Dmsg1(500, "we have %d fields\n", m_num_fields);
689
690             m_num_rows = INGntuples(m_result);
691             Dmsg1(500, "we have %d rows\n", m_num_rows);
692          } else {
693             Dmsg0(500, "No resultset...\n");
694             retval = false;
695             goto bail_out;
696          }
697       }
698
699       bp = cp;
700    }
701
702 bail_out:
703    if (end_of_transaction) {
704       Dmsg0(500,"sql_query: End of transaction, commiting work\n");
705       m_explicit_commit = true;
706       INGcommit(m_db_handle);
707    }
708
709    free(dup_query);
710    Dmsg0(500, "sql_query finishing\n");
711
712    return retval;
713 }  
714
715 void B_DB_INGRES::sql_free_result(void)
716 {
717    db_lock(this);
718    if (m_result) {
719       INGclear(m_result);
720       m_result = NULL;
721    }
722    if (m_rows) {
723       free(m_rows);
724       m_rows = NULL;
725    }
726    if (m_fields) {
727       free(m_fields);
728       m_fields = NULL;
729    }
730    m_num_rows = m_num_fields = 0;
731    db_unlock(this);
732 }
733
734 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
735 {
736    int j;
737    SQL_ROW row = NULL; /* by default, return NULL */
738
739    if (!m_result) {
740       return row;
741    }
742    if (m_result->num_rows <= 0) {
743       return row;
744    }
745
746    Dmsg0(500, "sql_fetch_row start\n");
747
748    if (!m_rows || m_rows_size < m_num_fields) {
749       if (m_rows) {
750          Dmsg0(500, "sql_fetch_row freeing space\n");
751          free(m_rows);
752       }
753       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
754       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
755       m_rows_size = m_num_fields;
756
757       /*
758        * Now reset the row_number now that we have the space allocated
759        */
760       m_row_number = 0;
761    }
762
763    /*
764     * If still within the result set
765     */
766    if (m_row_number < m_num_rows) {
767       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
768       /*
769        * Get each value from this row
770        */
771       for (j = 0; j < m_num_fields; j++) {
772          m_rows[j] = INGgetvalue(m_result, m_row_number, j);
773          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
774       }
775       /*
776        * Increment the row number for the next call
777        */
778       m_row_number++;
779
780       row = m_rows;
781    } else {
782       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
783    }
784
785    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
786
787    return row;
788 }
789
790 const char *B_DB_INGRES::sql_strerror(void)
791 {
792    return INGerrorMessage(m_db_handle);
793 }
794
795 void B_DB_INGRES::sql_data_seek(int row)
796 {
797    /*
798     * Set the row number to be returned on the next call to sql_fetch_row
799     */
800    m_row_number = row;
801 }
802
803 int B_DB_INGRES::sql_affected_rows(void)
804 {
805    return m_num_rows;
806 }
807
808 /*
809  * First execute the insert query and then retrieve the currval.
810  * By setting transaction to true we make it an atomic transaction
811  * and as such we can get the currval after which we commit if
812  * transaction is false. This way things are an atomic operation
813  * for Ingres and things work. We save the current transaction status
814  * and set transaction in the mdb to true and at the end of this
815  * function we restore the actual transaction status.
816  */
817 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
818 {
819    char sequence[64];
820    char getkeyval_query[256];
821    char *currval;
822    uint64_t id = 0;
823    bool current_explicit_commit;
824
825    /*
826     * Save the current transaction status and pretend we are in a transaction.
827     */
828    current_explicit_commit = m_explicit_commit;
829    m_explicit_commit = false;
830
831    /*
832     * Execute the INSERT query.
833     */
834    m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
835    if (m_num_rows == -1) {
836       goto bail_out;
837    }
838
839    changes++;
840
841    /*
842     * Obtain the current value of the sequence that
843     * provides the serial value for primary key of the table.
844     *
845     * currval is local to our session. It is not affected by
846     * other transactions.
847     *
848     * Determine the name of the sequence.
849     * As we name all sequences as <table>_seq this is easy.
850     */
851    bstrncpy(sequence, table_name, sizeof(sequence));
852    bstrncat(sequence, "_seq", sizeof(sequence));
853
854    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
855
856    if (m_result) {
857       INGclear(m_result);
858       m_result = NULL;
859    }
860    m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
861
862    if (!m_result) {
863       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
864       goto bail_out;
865    }
866
867    Dmsg0(500, "exec done");
868
869    currval = INGgetvalue(m_result, 0, 0);
870    if (currval) {
871       id = str_to_uint64(currval);
872    }
873
874    INGclear(m_result);
875    m_result = NULL;
876
877 bail_out:
878    /*
879     * Restore the actual explicit_commit status.
880     */
881    m_explicit_commit = current_explicit_commit;
882
883    /*
884     * Commit if explicit_commit is not set.
885     */
886    if (m_explicit_commit) {
887       INGcommit(m_db_handle);
888    }
889
890    return id;
891 }
892
893 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
894 {
895    int i, j;
896    int max_length;
897    int this_length;
898
899    if (!m_fields || m_fields_size < m_num_fields) {
900       if (m_fields) {
901          free(m_fields);
902          m_fields = NULL;
903       }
904       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
905       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
906       m_fields_size = m_num_fields;
907
908       for (i = 0; i < m_num_fields; i++) {
909          Dmsg1(500, "filling field %d\n", i);
910          m_fields[i].name = INGfname(m_result, i);
911          m_fields[i].type = INGftype(m_result, i);
912          m_fields[i].flags = 0;
913
914          /*
915           * For a given column, find the max length.
916           */
917          max_length = 0;
918          for (j = 0; j < m_num_rows; j++) {
919             if (INGgetisnull(m_result, j, i)) {
920                 this_length = 4;        /* "NULL" */
921             } else {
922                 this_length = cstrlen(INGgetvalue(m_result, j, i));
923             }
924
925             if (max_length < this_length) {
926                max_length = this_length;
927             }
928          }
929          m_fields[i].max_length = max_length;
930
931          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
932                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
933       }
934    }
935
936    /*
937     * Increment field number for the next time around
938     */
939    return &m_fields[m_field_number++];
940 }
941
942 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
943 {
944    switch (field_type) {
945    case 1:
946       return true;
947    default:
948       return false;
949    }
950 }
951
952 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
953 {
954    /*
955     * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
956     */
957    switch (field_type) {
958    case IISQ_DEC_TYPE:
959    case IISQ_INT_TYPE:
960    case IISQ_FLT_TYPE:
961       return true;
962    default:
963       return false;
964    }
965 }
966
967 /*
968  * Escape strings so that Ingres is happy on COPY
969  *
970  *   NOTE! len is the length of the old string. Your new
971  *         string must be long enough (max 2*old+1) to hold
972  *         the escaped output.
973  */
974 static char *ingres_copy_escape(char *dest, char *src, size_t len)
975 {
976    /* we have to escape \t, \n, \r, \ */
977    char c = '\0' ;
978
979    while (len > 0 && *src) {
980       switch (*src) {
981       case '\n':
982          c = 'n';
983          break;
984       case '\\':
985          c = '\\';
986          break;
987       case '\t':
988          c = 't';
989          break;
990       case '\r':
991          c = 'r';
992          break;
993       default:
994          c = '\0' ;
995       }
996
997       if (c) {
998          *dest = '\\';
999          dest++;
1000          *dest = c;
1001       } else {
1002          *dest = *src;
1003       }
1004
1005       len--;
1006       src++;
1007       dest++;
1008    }
1009
1010    *dest = '\0';
1011    return dest;
1012 }
1013
1014 /* 
1015  * Returns true if OK
1016  *         false if failed
1017  */
1018 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1019 {
1020    bool ok;
1021
1022    db_lock(this);
1023    ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1024                            "FileIndex INTEGER,"
1025                            "JobId INTEGER,"
1026                            "Path VARBYTE(32000),"
1027                            "Name VARBYTE(32000),"
1028                            "LStat VARBYTE(255),"
1029                            "MD5 VARBYTE(255),"
1030                            "DeltaSeq SMALLINT)"
1031                            " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1032    db_unlock(this);
1033    return ok;
1034 }
1035
1036 /* 
1037  * Returns true if OK
1038  *         false if failed
1039  */
1040 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1041 {
1042    m_status = 0;
1043    return true;
1044 }
1045
1046 /* 
1047  * Returns true if OK
1048  *         false if failed
1049  */
1050 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1051 {
1052    size_t len;
1053    const char *digest;
1054    char ed1[50];
1055
1056    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1057    db_escape_string(jcr, esc_name, fname, fnl);
1058
1059    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1060    db_escape_string(jcr, esc_path, path, pnl);
1061
1062    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1063       digest = "0";
1064    } else {
1065       digest = ar->Digest;
1066    }
1067
1068    len = Mmsg(cmd, "INSERT INTO batch VALUES "
1069                    "(%u,%s,'%s','%s','%s','%s',%u)",
1070                    ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1071                    esc_name, ar->attr, digest, ar->DeltaSeq);
1072
1073    return sql_query(cmd);
1074 }
1075
1076 /*
1077  * Initialize database data structure. In principal this should
1078  * never have errors, or it is really fatal.
1079  */
1080 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1081                        const char *db_password, const char *db_address, int db_port,
1082                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1083 {
1084    B_DB_INGRES *mdb = NULL;
1085
1086    if (!db_user) {
1087       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1088       return NULL;
1089    }
1090
1091    P(mutex);                          /* lock DB queue */
1092    if (db_list && !mult_db_connections) {
1093       /*
1094        * Look to see if DB already open
1095        */
1096       foreach_dlist(mdb, db_list) {
1097          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1098             Dmsg1(100, "DB REopen %s\n", db_name);
1099             mdb->increment_refcount();
1100             goto bail_out;
1101          }
1102       }
1103    }
1104
1105    Dmsg0(100, "db_init_database first time\n");
1106    mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1107                          db_port, db_socket, mult_db_connections, disable_batch_insert));
1108
1109 bail_out:
1110    V(mutex);
1111    return mdb;
1112 }
1113 #endif /* HAVE_INGRES */