]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Ensure that bvfs SQL link is not shared
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2011 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  * These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  * Major rewrite by Marco van Wieringen, January 2010 for catalog refactoring.
38  */
39
40 #include "bacula.h"
41
42 #ifdef HAVE_INGRES
43
44 #include "cats.h"
45 #include "bdb_priv.h"
46 #include "myingres.h"
47 #include "bdb_ingres.h"
48 #include "lib/breg.h"
49
50 /* -----------------------------------------------------------------------
51  *
52  *   Ingres dependent defines and subroutines
53  *
54  * -----------------------------------------------------------------------
55  */
56
57 /*
58  * List of open databases.
59  */
60 static dlist *db_list = NULL;
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 struct B_DB_RWRULE {
65    int pattern_length;
66    char *search_pattern;
67    BREGEXP *rewrite_regexp;
68    bool trigger;
69 };
70
71 /*
72  * Create a new query filter.
73  */
74 static bool db_allocate_query_filter(JCR *jcr, alist *query_filters, int pattern_length,
75                                      const char *search_pattern, const char *filter)
76 {
77    B_DB_RWRULE *rewrite_rule;
78
79    rewrite_rule = (B_DB_RWRULE *)malloc(sizeof(B_DB_RWRULE));
80
81    rewrite_rule->pattern_length = pattern_length;
82    rewrite_rule->search_pattern = bstrdup(search_pattern);
83    rewrite_rule->rewrite_regexp = new_bregexp(filter);
84    rewrite_rule->trigger = false;
85
86    if (!rewrite_rule->rewrite_regexp) {
87       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filter.\n"));
88       free(rewrite_rule->search_pattern);
89       free(rewrite_rule);
90       return false;
91    } else {
92       query_filters->append(rewrite_rule);
93       return true;
94    }
95 }
96
97 /*
98  * Create a stack of all filters that should be applied to a SQL query
99  * before submitting it to the database backend.
100  */
101 static inline alist *db_initialize_query_filters(JCR *jcr)
102 {
103    alist *query_filters;
104
105    query_filters = New(alist(10, not_owned_by_alist));
106
107    if (!query_filters) {
108       Jmsg(jcr, M_FATAL, 0, _("Failed to allocate space for query filters.\n"));
109       return NULL;
110    }
111
112    db_allocate_query_filter(jcr, query_filters, 6, "OFFSET",
113                             "/LIMIT ([0-9]+) OFFSET ([0-9]+)/OFFSET $2 FETCH NEXT $1 ROWS ONLY/ig");
114    db_allocate_query_filter(jcr, query_filters, 5, "LIMIT",
115                             "/LIMIT ([0-9]+)/FETCH FIRST $1 ROWS ONLY/ig");
116    db_allocate_query_filter(jcr, query_filters, 9, "TEMPORARY",
117                             "/CREATE TEMPORARY TABLE (.+)/DECLARE GLOBAL TEMPORARY TABLE $1 ON COMMIT PRESERVE ROWS WITH NORECOVERY/i");
118
119    return query_filters;
120 }
121
122 /*
123  * Free all query filters.
124  */
125 static inline void db_destroy_query_filters(alist *query_filters)
126 {
127    B_DB_RWRULE *rewrite_rule;
128
129    foreach_alist(rewrite_rule, query_filters) {
130       free_bregexp(rewrite_rule->rewrite_regexp);
131       free(rewrite_rule->search_pattern);
132       free(rewrite_rule);
133    }
134
135    delete query_filters;
136 }
137
138 B_DB_INGRES::B_DB_INGRES(JCR *jcr,
139                          const char *db_driver,
140                          const char *db_name,
141                          const char *db_user,
142                          const char *db_password,
143                          const char *db_address,
144                          int db_port,
145                          const char *db_socket,
146                          bool mult_db_connections,
147                          bool disable_batch_insert)
148 {
149    B_DB_INGRES *mdb;
150    int next_session_id = 0;
151
152    /*
153     * See what the next available session_id is.
154     * We first see what the highest session_id is used now.
155     */
156    if (db_list) {
157       foreach_dlist(mdb, db_list) {
158          if (mdb->m_session_id > next_session_id) {
159             next_session_id = mdb->m_session_id;
160          }
161       }
162    }
163
164    /*
165     * Initialize the parent class members.
166     */
167    m_db_interface_type  = SQL_INTERFACE_TYPE_INGRES;
168    m_db_type = SQL_TYPE_INGRES;
169    m_db_driver = bstrdup("ingres");
170    m_db_name = bstrdup(db_name);
171    m_db_user = bstrdup(db_user);
172    if (db_password) {
173       m_db_password = bstrdup(db_password);
174    }
175    if (db_address) {
176       m_db_address = bstrdup(db_address);
177    }
178    if (db_socket) {
179       m_db_socket = bstrdup(db_socket);
180    }
181    m_db_port = db_port;
182    if (disable_batch_insert) {
183       m_disabled_batch_insert = true;
184       m_have_batch_insert = false;
185    } else {
186       m_disabled_batch_insert = false;
187 #if defined(USE_BATCH_FILE_INSERT)
188       m_have_batch_insert = true;
189 #else
190       m_have_batch_insert = false;
191 #endif
192    }
193
194    errmsg = get_pool_memory(PM_EMSG); /* get error message buffer */
195    *errmsg = 0;
196    cmd = get_pool_memory(PM_EMSG); /* get command buffer */
197    cached_path = get_pool_memory(PM_FNAME);
198    cached_path_id = 0;
199    m_ref_count = 1;
200    fname = get_pool_memory(PM_FNAME);
201    path = get_pool_memory(PM_FNAME);
202    esc_name = get_pool_memory(PM_FNAME);
203    esc_path = get_pool_memory(PM_FNAME);
204    esc_obj = get_pool_memory(PM_FNAME);
205    m_allow_transactions = mult_db_connections;
206
207    /* At this time, when mult_db_connections == true, this is for 
208     * specific console command such as bvfs or batch mode, and we don't
209     * want to share a batch mode or bvfs. In the future, we can change
210     * the creation function to add this parameter.
211     */
212    m_dedicated = mult_db_connections; 
213
214    /*
215     * Initialize the private members.
216     */
217    m_db_handle = NULL;
218    m_result = NULL;
219    m_explicit_commit = true;
220    m_session_id = ++next_session_id;
221    m_query_filters = db_initialize_query_filters(jcr);
222
223    /*
224     * Put the db in the list.
225     */
226    if (db_list == NULL) {
227       db_list = New(dlist(this, &this->m_link));
228    }
229    db_list->append(this);
230 }
231
232 B_DB_INGRES::~B_DB_INGRES()
233 {
234 }
235
236 /*
237  * Now actually open the database.  This can generate errors,
238  *   which are returned in the errmsg
239  *
240  * DO NOT close the database or delete mdb here !!!!
241  */
242 bool B_DB_INGRES::db_open_database(JCR *jcr)
243 {
244    bool retval = false;
245    int errstat;
246
247    P(mutex);
248    if (m_connected) {
249       retval = true;
250       goto bail_out;
251    }
252
253    if ((errstat=rwl_init(&m_lock)) != 0) {
254       berrno be;
255       Mmsg1(&errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
256             be.bstrerror(errstat));
257       goto bail_out;
258    }
259
260    m_db_handle = INGconnectDB(m_db_name, m_db_user, m_db_password, m_session_id);
261
262    Dmsg0(50, "Ingres real CONNECT done\n");
263    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", m_db_user, m_db_name,
264               m_db_password == NULL ? "(NULL)" : m_db_password);
265
266    if (!m_db_handle) {
267       Mmsg2(&errmsg, _("Unable to connect to Ingres server.\n"
268             "Database=%s User=%s\n"
269             "It is probably not running or your password is incorrect.\n"),
270              m_db_name, m_db_user);
271       goto bail_out;
272    }
273
274    m_connected = true;
275
276    INGsetDefaultLockingMode(m_db_handle);
277
278    if (!check_tables_version(jcr, this)) {
279       goto bail_out;
280    }
281
282    retval = true;
283
284 bail_out:
285    V(mutex);
286    return retval;
287 }
288
289 void B_DB_INGRES::db_close_database(JCR *jcr)
290 {
291    db_end_transaction(jcr);
292    P(mutex);
293    m_ref_count--;
294    if (m_ref_count == 0) {
295       sql_free_result();
296       db_list->remove(this);
297       if (m_connected && m_db_handle) {
298          INGdisconnectDB(m_db_handle);
299       }
300       if (m_query_filters) {
301          db_destroy_query_filters(m_query_filters);
302       }
303       rwl_destroy(&m_lock);
304       free_pool_memory(errmsg);
305       free_pool_memory(cmd);
306       free_pool_memory(cached_path);
307       free_pool_memory(fname);
308       free_pool_memory(path);
309       free_pool_memory(esc_name);
310       free_pool_memory(esc_path);
311       free_pool_memory(esc_obj);
312       free(m_db_driver);
313       free(m_db_name);
314       free(m_db_user);
315       if (m_db_password) {
316          free(m_db_password);
317       }
318       if (m_db_address) {
319          free(m_db_address);
320       }
321       if (m_db_socket) {
322          free(m_db_socket);
323       }
324       delete this;
325       if (db_list->size() == 0) {
326          delete db_list;
327          db_list = NULL;
328       }
329    }
330    V(mutex);
331 }
332
333 void B_DB_INGRES::db_thread_cleanup(void)
334 {
335 }
336
337 /*
338  * Escape strings so that Ingres is happy
339  *
340  *   NOTE! len is the length of the old string. Your new
341  *         string must be long enough (max 2*old+1) to hold
342  *         the escaped output.
343  */
344 void B_DB_INGRES::db_escape_string(JCR *jcr, char *snew, char *old, int len)
345 {
346    char *n, *o;
347
348    n = snew;
349    o = old;
350    while (len--) {
351       switch (*o) {
352       case '\'':
353          *n++ = '\'';
354          *n++ = '\'';
355          o++;
356          break;
357       case 0:
358          *n++ = '\\';
359          *n++ = 0;
360          o++;
361          break;
362       default:
363          *n++ = *o++;
364          break;
365       }
366    }
367    *n = 0;
368 }
369
370 /*
371  * Escape binary so that Ingres is happy
372  *
373  *   NOTE! Need to be implemented (escape \0)
374  *
375  */
376 char *B_DB_INGRES::db_escape_object(JCR *jcr, char *old, int len)
377 {
378    char *n, *o;
379
380    n = esc_obj = check_pool_memory_size(esc_obj, len*2+1);
381    o = old;
382    while (len--) {
383       switch (*o) {
384       case '\'':
385          *n++ = '\'';
386          *n++ = '\'';
387          o++;
388          break;
389       case 0:
390          *n++ = '\\';
391          *n++ = 0;
392          o++;
393          break;
394       default:
395          *n++ = *o++;
396          break;
397       }
398    }
399    *n = 0;
400    return esc_obj;
401 }
402
403 /*
404  * Unescape binary object so that Ingres is happy
405  *
406  * TODO: need to be implemented (escape \0)
407  */
408 void B_DB_INGRES::db_unescape_object(JCR *jcr, char *from, int32_t expected_len,
409                                      POOLMEM **dest, int32_t *dest_len)
410 {
411    if (!from) {
412       *dest[0] = 0;
413       *dest_len = 0;
414       return;
415    }
416    *dest = check_pool_memory_size(*dest, expected_len+1);
417    *dest_len = expected_len;
418    memcpy(*dest, from, expected_len);
419    (*dest)[expected_len]=0;
420 }
421
422 /*
423  * Start a transaction. This groups inserts and makes things
424  * much more efficient. Usually started when inserting
425  * file attributes.
426  */
427 void B_DB_INGRES::db_start_transaction(JCR *jcr)
428 {
429    if (!jcr->attr) {
430       jcr->attr = get_pool_memory(PM_FNAME);
431    }
432    if (!jcr->ar) {
433       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
434    }
435
436    if (!m_allow_transactions) {
437       return;
438    }
439
440    db_lock(this);
441    /* Allow only 25,000 changes per transaction */
442    if (m_transaction && changes > 25000) {
443       db_end_transaction(jcr);
444    }
445    if (!m_transaction) {
446       sql_query("BEGIN");  /* begin transaction */
447       Dmsg0(400, "Start Ingres transaction\n");
448       m_transaction = true;
449    }
450    db_unlock(this);
451 }
452
453 void B_DB_INGRES::db_end_transaction(JCR *jcr)
454 {
455    if (jcr && jcr->cached_attribute) {
456       Dmsg0(400, "Flush last cached attribute.\n");
457       if (!db_create_attributes_record(jcr, this, jcr->ar)) {
458          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
459       }
460       jcr->cached_attribute = false;
461    }
462
463    if (!m_allow_transactions) {
464       return;
465    }
466
467    db_lock(this);
468    if (m_transaction) {
469       sql_query("COMMIT"); /* end transaction */
470       m_transaction = false;
471       Dmsg1(400, "End Ingres transaction changes=%d\n", changes);
472    }
473    changes = 0;
474    db_unlock(this);
475 }
476
477 /*
478  * Submit a general SQL command (cmd), and for each row returned,
479  *  the result_handler is called with the ctx.
480  */
481 bool B_DB_INGRES::db_sql_query(const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
482 {
483    SQL_ROW row;
484    bool retval = true;
485
486    Dmsg1(500, "db_sql_query starts with %s\n", query);
487
488    db_lock(this);
489    if (!sql_query(query, QF_STORE_RESULT)) {
490       Mmsg(errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror());
491       Dmsg0(500, "db_sql_query failed\n");
492       retval = false;
493       goto bail_out;
494    }
495
496    if (result_handler != NULL) {
497       Dmsg0(500, "db_sql_query invoking handler\n");
498       while ((row = sql_fetch_row()) != NULL) {
499          Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
500          if (result_handler(ctx, m_num_fields, row))
501             break;
502       }
503       sql_free_result();
504    }
505
506    Dmsg0(500, "db_sql_query finished\n");
507
508 bail_out:
509    db_unlock(this);
510    return retval;
511 }
512
513 /*
514  * Note, if this routine returns false (failure), Bacula expects
515  * that no result has been stored.
516  *
517  *  Returns:  true  on success
518  *            false on failure
519  *
520  */
521 bool B_DB_INGRES::sql_query(const char *query, int flags)
522 {
523    int cols;
524    char *cp, *bp;
525    char *dup_query, *new_query;
526    bool retval = true;
527    bool start_of_transaction = false;
528    bool end_of_transaction = false;
529    B_DB_RWRULE *rewrite_rule;
530
531    Dmsg1(500, "query starts with '%s'\n", query);
532    /*
533     * We always make a private copy of the query as we are doing serious
534     * rewrites in this engine. When running the private copy through the
535     * different query filters we loose the orginal private copy so we
536     * first make a extra reference to it so we can free it on exit from the
537     * function.
538     */
539    dup_query = new_query = bstrdup(query);
540
541    /*
542     * Iterate over the query string and perform any needed operations.
543     * We use a sliding window over the query string where bp points to
544     * the previous position in the query and cp to the current position
545     * in the query.
546     */
547    bp = new_query;
548    while (bp != NULL) {
549       if ((cp = strchr(bp, ' ')) != NULL) {
550          *cp++;
551       }
552
553       if (!strncasecmp(bp, "BEGIN", 5)) {
554          /*
555           * This is the start of a transaction.
556           * Inline copy the rest of the query over the BEGIN keyword.
557           */
558          if (cp) {
559             strcpy(bp, cp);
560          } else {
561             *bp = '\0';
562          }
563          start_of_transaction = true;
564       } else if (!strncasecmp(bp, "COMMIT", 6) && (cp == NULL || strncasecmp(cp, "PRESERVE", 8))) {
565          /*
566           * This is the end of a transaction. We cannot check for just the COMMIT
567           * keyword as a DECLARE of an tempory table also has the word COMMIT in it
568           * but its followed by the word PRESERVE.
569           * Inline copy the rest of the query over the COMMIT keyword.
570           */
571          if (cp) {
572             strcpy(bp, cp);
573          } else {
574             *bp = '\0';
575          }
576          end_of_transaction = true;
577       }
578
579       /*
580        * See what query filter might match.
581        */
582       foreach_alist(rewrite_rule, m_query_filters) {
583          if (!strncasecmp(bp, rewrite_rule->search_pattern, rewrite_rule->pattern_length)) {
584             rewrite_rule->trigger = true;
585          }
586       }
587
588       /*
589        * Slide window.
590        */
591       bp = cp;
592    }
593
594    /*
595     * Run the query through all query filters that apply e.g. have the trigger set in the
596     * previous loop.
597     */
598    foreach_alist(rewrite_rule, m_query_filters) {
599       if (rewrite_rule->trigger) {
600          new_query = rewrite_rule->rewrite_regexp->replace(new_query);
601          rewrite_rule->trigger = false;
602       }
603    }
604
605    if (start_of_transaction) {
606       Dmsg0(500,"sql_query: Start of transaction\n");
607       m_explicit_commit = false;
608    }
609
610    /*
611     * See if there is any query left after filtering for certain keywords.
612     */
613    bp = new_query;
614    while (bp != NULL && strlen(bp) > 0) {
615       /*
616        * We are starting a new query.  reset everything.
617        */
618       m_num_rows     = -1;
619       m_row_number   = -1;
620       m_field_number = -1;
621
622       if (m_result) {
623          INGclear(m_result);  /* hmm, someone forgot to free?? */
624          m_result = NULL;
625       }
626
627       /*
628        * See if this is a multi-statement query. We split a multi-statement query
629        * on the semi-column and feed the individual queries to the Ingres functions.
630        * We use a sliding window over the query string where bp points to
631        * the previous position in the query and cp to the current position
632        * in the query.
633        */
634       if ((cp = strchr(bp, ';')) != NULL) {
635          *cp++ = '\0';
636       }
637
638       Dmsg1(500, "sql_query after rewrite continues with '%s'\n", bp);
639
640       /*
641        * See if we got a store_result hint which could mean we are running a select.
642        * If flags has QF_STORE_RESULT not set we are sure its not a query that we
643        * need to store anything for.
644        */
645       if (flags & QF_STORE_RESULT) {
646          cols = INGgetCols(m_db_handle, bp, m_explicit_commit);
647       } else {
648          cols = 0;
649       }
650
651       if (cols <= 0) {
652          if (cols < 0 ) {
653             Dmsg0(500,"sql_query: neg.columns: no DML stmt!\n");
654             retval = false;
655             goto bail_out;
656          }
657          Dmsg0(500,"sql_query (non SELECT) starting...\n");
658          /*
659           * non SELECT
660           */
661          m_num_rows = INGexec(m_db_handle, bp, m_explicit_commit);
662          if (m_num_rows == -1) {
663            Dmsg0(500,"sql_query (non SELECT) went wrong\n");
664            retval = false;
665            goto bail_out;
666          } else {
667            Dmsg0(500,"sql_query (non SELECT) seems ok\n");
668          }
669       } else {
670          /*
671           * SELECT
672           */
673          Dmsg0(500,"sql_query (SELECT) starting...\n");
674          m_result = INGquery(m_db_handle, bp, m_explicit_commit);
675          if (m_result != NULL) {
676             Dmsg0(500, "we have a result\n");
677
678             /*
679              * How many fields in the set?
680              */
681             m_num_fields = (int)INGnfields(m_result);
682             Dmsg1(500, "we have %d fields\n", m_num_fields);
683
684             m_num_rows = INGntuples(m_result);
685             Dmsg1(500, "we have %d rows\n", m_num_rows);
686          } else {
687             Dmsg0(500, "No resultset...\n");
688             retval = false;
689             goto bail_out;
690          }
691       }
692
693       bp = cp;
694    }
695
696 bail_out:
697    if (end_of_transaction) {
698       Dmsg0(500,"sql_query: End of transaction, commiting work\n");
699       m_explicit_commit = true;
700       INGcommit(m_db_handle);
701    }
702
703    free(dup_query);
704    Dmsg0(500, "sql_query finishing\n");
705
706    return retval;
707 }  
708
709 void B_DB_INGRES::sql_free_result(void)
710 {
711    db_lock(this);
712    if (m_result) {
713       INGclear(m_result);
714       m_result = NULL;
715    }
716    if (m_rows) {
717       free(m_rows);
718       m_rows = NULL;
719    }
720    if (m_fields) {
721       free(m_fields);
722       m_fields = NULL;
723    }
724    m_num_rows = m_num_fields = 0;
725    db_unlock(this);
726 }
727
728 SQL_ROW B_DB_INGRES::sql_fetch_row(void)
729 {
730    int j;
731    SQL_ROW row = NULL; /* by default, return NULL */
732
733    if (!m_result) {
734       return row;
735    }
736    if (m_result->num_rows <= 0) {
737       return row;
738    }
739
740    Dmsg0(500, "sql_fetch_row start\n");
741
742    if (!m_rows || m_rows_size < m_num_fields) {
743       if (m_rows) {
744          Dmsg0(500, "sql_fetch_row freeing space\n");
745          free(m_rows);
746       }
747       Dmsg1(500, "we need space for %d bytes\n", sizeof(char *) * m_num_fields);
748       m_rows = (SQL_ROW)malloc(sizeof(char *) * m_num_fields);
749       m_rows_size = m_num_fields;
750
751       /*
752        * Now reset the row_number now that we have the space allocated
753        */
754       m_row_number = 0;
755    }
756
757    /*
758     * If still within the result set
759     */
760    if (m_row_number < m_num_rows) {
761       Dmsg2(500, "sql_fetch_row row number '%d' is acceptable (0..%d)\n", m_row_number, m_num_rows);
762       /*
763        * Get each value from this row
764        */
765       for (j = 0; j < m_num_fields; j++) {
766          m_rows[j] = INGgetvalue(m_result, m_row_number, j);
767          Dmsg2(500, "sql_fetch_row field '%d' has value '%s'\n", j, m_rows[j]);
768       }
769       /*
770        * Increment the row number for the next call
771        */
772       m_row_number++;
773
774       row = m_rows;
775    } else {
776       Dmsg2(500, "sql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", m_row_number, m_num_rows);
777    }
778
779    Dmsg1(500, "sql_fetch_row finishes returning %p\n", row);
780
781    return row;
782 }
783
784 const char *B_DB_INGRES::sql_strerror(void)
785 {
786    return INGerrorMessage(m_db_handle);
787 }
788
789 void B_DB_INGRES::sql_data_seek(int row)
790 {
791    /*
792     * Set the row number to be returned on the next call to sql_fetch_row
793     */
794    m_row_number = row;
795 }
796
797 int B_DB_INGRES::sql_affected_rows(void)
798 {
799    return m_num_rows;
800 }
801
802 /*
803  * First execute the insert query and then retrieve the currval.
804  * By setting transaction to true we make it an atomic transaction
805  * and as such we can get the currval after which we commit if
806  * transaction is false. This way things are an atomic operation
807  * for Ingres and things work. We save the current transaction status
808  * and set transaction in the mdb to true and at the end of this
809  * function we restore the actual transaction status.
810  */
811 uint64_t B_DB_INGRES::sql_insert_autokey_record(const char *query, const char *table_name)
812 {
813    char sequence[64];
814    char getkeyval_query[256];
815    char *currval;
816    uint64_t id = 0;
817    bool current_explicit_commit;
818
819    /*
820     * Save the current transaction status and pretend we are in a transaction.
821     */
822    current_explicit_commit = m_explicit_commit;
823    m_explicit_commit = false;
824
825    /*
826     * Execute the INSERT query.
827     */
828    m_num_rows = INGexec(m_db_handle, query, m_explicit_commit);
829    if (m_num_rows == -1) {
830       goto bail_out;
831    }
832
833    changes++;
834
835    /*
836     * Obtain the current value of the sequence that
837     * provides the serial value for primary key of the table.
838     *
839     * currval is local to our session. It is not affected by
840     * other transactions.
841     *
842     * Determine the name of the sequence.
843     * As we name all sequences as <table>_seq this is easy.
844     */
845    bstrncpy(sequence, table_name, sizeof(sequence));
846    bstrncat(sequence, "_seq", sizeof(sequence));
847
848    bsnprintf(getkeyval_query, sizeof(getkeyval_query), "SELECT %s.currval FROM %s", sequence, table_name);
849
850    if (m_result) {
851       INGclear(m_result);
852       m_result = NULL;
853    }
854    m_result = INGquery(m_db_handle, getkeyval_query, m_explicit_commit);
855
856    if (!m_result) {
857       Dmsg1(50, "Query failed: %s\n", getkeyval_query);
858       goto bail_out;
859    }
860
861    Dmsg0(500, "exec done");
862
863    currval = INGgetvalue(m_result, 0, 0);
864    if (currval) {
865       id = str_to_uint64(currval);
866    }
867
868    INGclear(m_result);
869    m_result = NULL;
870
871 bail_out:
872    /*
873     * Restore the actual explicit_commit status.
874     */
875    m_explicit_commit = current_explicit_commit;
876
877    /*
878     * Commit if explicit_commit is not set.
879     */
880    if (m_explicit_commit) {
881       INGcommit(m_db_handle);
882    }
883
884    return id;
885 }
886
887 SQL_FIELD *B_DB_INGRES::sql_fetch_field(void)
888 {
889    int i, j;
890    int max_length;
891    int this_length;
892
893    if (!m_fields || m_fields_size < m_num_fields) {
894       if (m_fields) {
895          free(m_fields);
896          m_fields = NULL;
897       }
898       Dmsg1(500, "allocating space for %d fields\n", m_num_fields);
899       m_fields = (SQL_FIELD *)malloc(sizeof(SQL_FIELD) * m_num_fields);
900       m_fields_size = m_num_fields;
901
902       for (i = 0; i < m_num_fields; i++) {
903          Dmsg1(500, "filling field %d\n", i);
904          m_fields[i].name = INGfname(m_result, i);
905          m_fields[i].type = INGftype(m_result, i);
906          m_fields[i].flags = 0;
907
908          /*
909           * For a given column, find the max length.
910           */
911          max_length = 0;
912          for (j = 0; j < m_num_rows; j++) {
913             if (INGgetisnull(m_result, j, i)) {
914                 this_length = 4;        /* "NULL" */
915             } else {
916                 this_length = cstrlen(INGgetvalue(m_result, j, i));
917             }
918
919             if (max_length < this_length) {
920                max_length = this_length;
921             }
922          }
923          m_fields[i].max_length = max_length;
924
925          Dmsg4(500, "sql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
926                m_fields[i].name, m_fields[i].max_length, m_fields[i].type, m_fields[i].flags);
927       }
928    }
929
930    /*
931     * Increment field number for the next time around
932     */
933    return &m_fields[m_field_number++];
934 }
935
936 bool B_DB_INGRES::sql_field_is_not_null(int field_type)
937 {
938    switch (field_type) {
939    case 1:
940       return true;
941    default:
942       return false;
943    }
944 }
945
946 bool B_DB_INGRES::sql_field_is_numeric(int field_type)
947 {
948    /*
949     * See ${II_SYSTEM}/ingres/files/eqsqlda.h for numeric types.
950     */
951    switch (field_type) {
952    case IISQ_DEC_TYPE:
953    case IISQ_INT_TYPE:
954    case IISQ_FLT_TYPE:
955       return true;
956    default:
957       return false;
958    }
959 }
960
961 /*
962  * Escape strings so that Ingres is happy on COPY
963  *
964  *   NOTE! len is the length of the old string. Your new
965  *         string must be long enough (max 2*old+1) to hold
966  *         the escaped output.
967  */
968 static char *ingres_copy_escape(char *dest, char *src, size_t len)
969 {
970    /* we have to escape \t, \n, \r, \ */
971    char c = '\0' ;
972
973    while (len > 0 && *src) {
974       switch (*src) {
975       case '\n':
976          c = 'n';
977          break;
978       case '\\':
979          c = '\\';
980          break;
981       case '\t':
982          c = 't';
983          break;
984       case '\r':
985          c = 'r';
986          break;
987       default:
988          c = '\0' ;
989       }
990
991       if (c) {
992          *dest = '\\';
993          dest++;
994          *dest = c;
995       } else {
996          *dest = *src;
997       }
998
999       len--;
1000       src++;
1001       dest++;
1002    }
1003
1004    *dest = '\0';
1005    return dest;
1006 }
1007
1008 /* 
1009  * Returns true if OK
1010  *         false if failed
1011  */
1012 bool B_DB_INGRES::sql_batch_start(JCR *jcr)
1013 {
1014    bool ok;
1015
1016    db_lock(this);
1017    ok = sql_query("DECLARE GLOBAL TEMPORARY TABLE batch ("
1018                            "FileIndex INTEGER,"
1019                            "JobId INTEGER,"
1020                            "Path VARBYTE(32000),"
1021                            "Name VARBYTE(32000),"
1022                            "LStat VARBYTE(255),"
1023                            "MD5 VARBYTE(255),"
1024                            "DeltaSeq SMALLINT)"
1025                            " ON COMMIT PRESERVE ROWS WITH NORECOVERY");
1026    db_unlock(this);
1027    return ok;
1028 }
1029
1030 /* 
1031  * Returns true if OK
1032  *         false if failed
1033  */
1034 bool B_DB_INGRES::sql_batch_end(JCR *jcr, const char *error)
1035 {
1036    m_status = 0;
1037    return true;
1038 }
1039
1040 /* 
1041  * Returns true if OK
1042  *         false if failed
1043  */
1044 bool B_DB_INGRES::sql_batch_insert(JCR *jcr, ATTR_DBR *ar)
1045 {
1046    size_t len;
1047    const char *digest;
1048    char ed1[50];
1049
1050    esc_name = check_pool_memory_size(esc_name, fnl*2+1);
1051    db_escape_string(jcr, esc_name, fname, fnl);
1052
1053    esc_path = check_pool_memory_size(esc_path, pnl*2+1);
1054    db_escape_string(jcr, esc_path, path, pnl);
1055
1056    if (ar->Digest == NULL || ar->Digest[0] == 0) {
1057       digest = "0";
1058    } else {
1059       digest = ar->Digest;
1060    }
1061
1062    len = Mmsg(cmd, "INSERT INTO batch VALUES "
1063                    "(%u,%s,'%s','%s','%s','%s',%u)",
1064                    ar->FileIndex, edit_int64(ar->JobId,ed1), esc_path, 
1065                    esc_name, ar->attr, digest, ar->DeltaSeq);
1066
1067    return sql_query(cmd);
1068 }
1069
1070 /*
1071  * Initialize database data structure. In principal this should
1072  * never have errors, or it is really fatal.
1073  */
1074 B_DB *db_init_database(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user,
1075                        const char *db_password, const char *db_address, int db_port,
1076                        const char *db_socket, bool mult_db_connections, bool disable_batch_insert)
1077 {
1078    B_DB_INGRES *mdb = NULL;
1079
1080    if (!db_user) {
1081       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
1082       return NULL;
1083    }
1084
1085    P(mutex);                          /* lock DB queue */
1086    if (db_list && !mult_db_connections) {
1087       /*
1088        * Look to see if DB already open
1089        */
1090       foreach_dlist(mdb, db_list) {
1091          if (mdb->db_match_database(db_driver, db_name, db_address, db_port)) {
1092             Dmsg1(100, "DB REopen %s\n", db_name);
1093             mdb->increment_refcount();
1094             goto bail_out;
1095          }
1096       }
1097    }
1098
1099    Dmsg0(100, "db_init_database first time\n");
1100    mdb = New(B_DB_INGRES(jcr, db_driver, db_name, db_user, db_password, db_address,
1101                          db_port, db_socket, mult_db_connections, disable_batch_insert));
1102
1103 bail_out:
1104    V(mutex);
1105    return mdb;
1106 }
1107 #endif /* HAVE_INGRES */