]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Revert "Fix typo"
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  *   These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_INGRES
49
50 #include "myingres.h"
51
52 /* -----------------------------------------------------------------------
53  *
54  *   Ingres dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "Ingres";
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83    int next_session_id = 0;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101
102          if (mdb->session_id > next_session_id) {
103             next_session_id = mdb->session_id;
104          }
105       }
106    } else {
107       /*
108        * See what the next available session_id is.
109        * We first see what the highest session_id is used now.
110        */
111       foreach_dlist(mdb, db_list) {
112          if (mdb->session_id > next_session_id) {
113             next_session_id = mdb->session_id;
114          }
115       }
116    }
117    Dmsg0(100, "db_open first time\n");
118    mdb = (B_DB *)malloc(sizeof(B_DB));
119    memset(mdb, 0, sizeof(B_DB));
120    mdb->db_name = bstrdup(db_name);
121    mdb->db_user = bstrdup(db_user);
122    if (db_password) {
123       mdb->db_password = bstrdup(db_password);
124    }
125    if (db_address) {
126       mdb->db_address  = bstrdup(db_address);
127    }
128    if (db_socket) {
129       mdb->db_socket   = bstrdup(db_socket);
130    }
131    mdb->db_port        = db_port;
132    mdb->session_id     = ++next_session_id;
133    mdb->have_insert_id = TRUE;
134    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
135    *mdb->errmsg        = 0;
136    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
137    mdb->cached_path    = get_pool_memory(PM_FNAME);
138    mdb->cached_path_id = 0;
139    mdb->ref_count      = 1;
140    mdb->fname          = get_pool_memory(PM_FNAME);
141    mdb->path           = get_pool_memory(PM_FNAME);
142    mdb->esc_name       = get_pool_memory(PM_FNAME);
143    mdb->esc_path      = get_pool_memory(PM_FNAME);
144    mdb->allow_transactions = mult_db_connections;
145    mdb->limit_filter = new_bregexp("/LIMIT ([0-9]+)/FETCH FIRST $1 ROW ONLY/g");
146    qinsert(&db_list, &mdb->bq);            /* put db in list */
147    V(mutex);
148    return mdb;
149 }
150
151 /* Check that the database correspond to the encoding we want */
152 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
153 {
154 /* SRE: TODO! Needed?
155  SQL_ROW row;
156    int ret=false;
157
158    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
159       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
160       return false;
161    }
162
163    if ((row = sql_fetch_row(mdb)) == NULL) {
164       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
165       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
166    } else {
167       ret = bstrcmp(row[0], "SQL_ASCII");
168       if (!ret) {
169          Mmsg(mdb->errmsg, 
170               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
171               mdb->db_name, row[0]);
172          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
173          Dmsg1(50, "%s", mdb->errmsg);
174       } 
175    }
176    return ret;
177 */
178     return true;
179 }
180
181 /*
182  * Now actually open the database.  This can generate errors,
183  *   which are returned in the errmsg
184  *
185  * DO NOT close the database or free(mdb) here !!!!
186  */
187 int
188 db_open_database(JCR *jcr, B_DB *mdb)
189 {
190    int errstat;
191    char buf[10], *port;
192
193    P(mutex);
194    if (mdb->connected) {
195       V(mutex);
196       return 1;
197    }
198    mdb->connected = false;
199
200    if ((errstat=rwl_init(&mdb->lock)) != 0) {
201       berrno be;
202       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
203             be.bstrerror(errstat));
204       V(mutex);
205       return 0;
206    }
207
208    if (mdb->db_port) {
209       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
210       port = buf;
211    } else {
212       port = NULL;
213    }
214
215    mdb->db = INGconnectDB(mdb->db_name, mdb->db_user, mdb->db_password, mdb->session_id);
216
217    Dmsg0(50, "Ingres real CONNECT done\n");
218    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
219             mdb->db_password==NULL?"(NULL)":mdb->db_password);
220
221    if (!mdb->db) {
222       Mmsg2(&mdb->errmsg, _("Unable to connect to Ingres server.\n"
223             "Database=%s User=%s\n"
224             "It is probably not running or your password is incorrect.\n"),
225              mdb->db_name, mdb->db_user);
226       V(mutex);
227       return 0;
228    }
229
230    mdb->connected = true;
231
232    if (!check_tables_version(jcr, mdb)) {
233       V(mutex);
234       return 0;
235    }
236
237    //sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
238    
239    /* check that encoding is SQL_ASCII */
240    check_database_encoding(jcr, mdb);
241
242    V(mutex);
243    return 1;
244 }
245
246 void
247 db_close_database(JCR *jcr, B_DB *mdb)
248 {
249    if (!mdb) {
250       return;
251    }
252    db_end_transaction(jcr, mdb);
253    P(mutex);
254    sql_free_result(mdb);
255    mdb->ref_count--;
256    if (mdb->ref_count == 0) {
257       qdchain(&mdb->bq);
258       if (mdb->connected && mdb->db) {
259          sql_close(mdb);
260       }
261       rwl_destroy(&mdb->lock);
262       free_pool_memory(mdb->errmsg);
263       free_pool_memory(mdb->cmd);
264       free_pool_memory(mdb->cached_path);
265       free_pool_memory(mdb->fname);
266       free_pool_memory(mdb->path);
267       free_pool_memory(mdb->esc_name);
268       free_pool_memory(mdb->esc_path);
269       free_bregexp(mdb->limit_filter);
270       if (mdb->db_name) {
271          free(mdb->db_name);
272       }
273       if (mdb->db_user) {
274          free(mdb->db_user);
275       }
276       if (mdb->db_password) {
277          free(mdb->db_password);
278       }
279       if (mdb->db_address) {
280          free(mdb->db_address);
281       }
282       if (mdb->db_socket) {
283          free(mdb->db_socket);
284       }
285       free(mdb);
286    }
287    V(mutex);
288 }
289
290 void db_check_backend_thread_safe()
291 { }
292
293
294
295 void db_thread_cleanup()
296 { }
297
298 /*
299  * Return the next unique index (auto-increment) for
300  * the given table.  Return NULL on error.
301  *
302  * For Ingres, NULL causes the auto-increment value     SRE: true?
303  *  to be updated.
304  */
305 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
306 {
307    strcpy(index, "NULL");
308    return 1;
309 }
310
311
312 /*
313  * Escape strings so that Ingres is happy
314  *
315  *   NOTE! len is the length of the old string. Your new
316  *         string must be long enough (max 2*old+1) to hold
317  *         the escaped output.
318  */
319 void
320 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
321 {
322    char *n, *o;
323
324    n = snew;
325    o = old;
326    while (len--) {
327       switch (*o) {
328       case '\'':
329          *n++ = '\'';
330          *n++ = '\'';
331          o++;
332          break;
333       case 0:
334          *n++ = '\\';
335          *n++ = 0;
336          o++;
337          break;
338       default:
339          *n++ = *o++;
340          break;
341       }
342    }
343    *n = 0;
344 }
345
346 /*
347  * Submit a general SQL command (cmd), and for each row returned,
348  *  the sqlite_handler is called with the ctx.
349  */
350 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
351 {
352    SQL_ROW row;
353
354    Dmsg0(500, "db_sql_query started\n");
355
356    db_lock(mdb);
357    if (sql_query(mdb, query) != 0) {
358       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
359       db_unlock(mdb);
360       Dmsg0(500, "db_sql_query failed\n");
361       return false;
362    }
363    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
364
365    if (result_handler != NULL) {
366       Dmsg0(500, "db_sql_query invoking handler\n");
367       if (mdb->result != NULL) {
368          int num_fields = sql_num_fields(mdb);
369
370          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
371          while ((row = sql_fetch_row(mdb)) != NULL) {
372
373             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
374             if (result_handler(ctx, num_fields, row))
375                break;
376          }
377
378         sql_free_result(mdb);
379       }
380    }
381    db_unlock(mdb);
382
383    Dmsg0(500, "db_sql_query finished\n");
384
385    return true;
386 }
387
388 /*
389  * Close database connection
390  */
391 void my_ingres_close(B_DB *mdb)
392 {
393    Dmsg0(500, "my_ingres_close closing database connection\n");
394    INGdisconnectDB(mdb->db);
395    //SRE: error handling? 
396 }
397
398 INGRES_ROW my_ingres_fetch_row(B_DB *mdb)
399 {
400    int j;
401    INGRES_ROW row = NULL; // by default, return NULL
402
403    if (!mdb->result) {
404       return row;
405    }
406    if (mdb->result->num_rows <= 0) {
407       return row;
408    }
409
410    Dmsg0(500, "my_ingres_fetch_row start\n");
411
412    if (!mdb->row || mdb->row_size < mdb->num_fields) {
413       int num_fields = mdb->num_fields;
414       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
415
416       if (mdb->row) {
417          Dmsg0(500, "my_ingres_fetch_row freeing space\n");
418          free(mdb->row);
419       }
420       num_fields += 20;                  /* add a bit extra */
421       mdb->row = (INGRES_ROW)malloc(sizeof(char *) * num_fields);
422       mdb->row_size = num_fields;
423
424       // now reset the row_number now that we have the space allocated
425       //mdb->row_number = 1;
426       mdb->row_number = 0;
427    }
428
429    // if still within the result set
430    //if (mdb->row_number <= mdb->num_rows) {
431    if (mdb->row_number < mdb->num_rows) {
432       Dmsg2(500, "my_ingres_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
433       // get each value from this row
434       for (j = 0; j < mdb->num_fields; j++) {
435          mdb->row[j] = INGgetvalue(mdb->result, mdb->row_number, j);
436          Dmsg2(500, "my_ingres_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
437       }
438       // increment the row number for the next call
439       mdb->row_number++;
440
441       row = mdb->row;
442    } else {
443       Dmsg2(500, "my_ingres_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
444    }
445
446    Dmsg1(500, "my_ingres_fetch_row finishes returning %p\n", row);
447
448    return row;
449 }
450
451
452 int my_ingres_max_length(B_DB *mdb, int field_num) {
453    //
454    // for a given column, find the max length
455    //
456    int max_length;
457    int i;
458    int this_length;
459
460    max_length = 0;
461    for (i = 0; i < mdb->num_rows; i++) {
462       if (INGgetisnull(mdb->result, i, field_num)) {
463           this_length = 4;        // "NULL"
464       } else {
465           this_length = cstrlen(INGgetvalue(mdb->result, i, field_num));
466       }
467
468       if (max_length < this_length) {
469           max_length = this_length;
470       }
471    }
472
473    return max_length;
474 }
475
476 INGRES_FIELD * my_ingres_fetch_field(B_DB *mdb)
477 {
478    int i;
479
480    Dmsg0(500, "my_ingres_fetch_field starts\n");
481
482    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
483       if (mdb->fields) {
484          free(mdb->fields);
485       }
486       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
487       mdb->fields = (INGRES_FIELD *)malloc(sizeof(INGRES_FIELD) * mdb->num_fields);
488       mdb->fields_size = mdb->num_fields;
489
490       for (i = 0; i < mdb->num_fields; i++) {
491          Dmsg1(500, "filling field %d\n", i);
492          strcpy(mdb->fields[i].name,INGfname(mdb->result, i));
493          mdb->fields[i].max_length = my_ingres_max_length(mdb, i);
494          mdb->fields[i].type       = INGftype(mdb->result, i);
495          mdb->fields[i].flags      = 0;
496
497          Dmsg4(500, "my_ingres_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
498             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
499             mdb->fields[i].flags);
500       } // end for
501    } // end if
502
503    // increment field number for the next time around
504
505    Dmsg0(500, "my_ingres_fetch_field finishes\n");
506    return &mdb->fields[mdb->field_number++];
507 }
508
509 void my_ingres_data_seek(B_DB *mdb, int row)
510 {
511    // set the row number to be returned on the next call
512    // to my_ingres_fetch_row
513    mdb->row_number = row;
514 }
515
516 void my_ingres_field_seek(B_DB *mdb, int field)
517 {
518    mdb->field_number = field;
519 }
520
521 /*
522  * Note, if this routine returns 1 (failure), Bacula expects
523  *  that no result has been stored.
524  *
525  *  Returns:  0  on success
526  *            1  on failure
527  *
528  */
529 int my_ingres_query(B_DB *mdb, const char *query)
530 {
531    const char *new_query;
532
533    if (strstr(query, "LIMIT") != NULL) {
534       new_query = mdb->limit_filter->replace(query);
535    } else {
536       new_query = query;
537    }
538
539    Dmsg0(500, "my_ingres_query started\n");
540    // We are starting a new query.  reset everything.
541    mdb->num_rows     = -1;
542    mdb->row_number   = -1;
543    mdb->field_number = -1;
544
545    int cols = -1;
546
547    if (mdb->result) {
548       INGclear(mdb->result);  /* hmm, someone forgot to free?? */
549       mdb->result = NULL;
550    }
551
552    Dmsg1(500, "my_ingres_query starts with '%s'\n", new_query);
553
554    /*
555     * See if we are getting a transaction start or end.
556     */
557    if (strcasecmp(new_query, "BEGIN") != NULL) {
558       /*
559        * Start of a transaction.
560        */
561       Dmsg0(500,"my_ingres_query: Start of transaction\n");
562       mdb->transaction = true;
563       return 0;
564    } else if (strcasecmp(new_query, "COMMIT") != NULL) {
565       /*
566        * End of a transaction.
567        */
568       Dmsg0(500,"my_ingres_query: End of transaction, commiting work\n");
569       mdb->transaction = false;
570       INGcommit(mdb->db);
571       return 0;
572    }
573
574    /* TODO: differentiate between SELECTs and other queries */
575
576    if ((cols = INGgetCols(mdb->db, new_query, mdb->transaction)) <= 0) {
577       if (cols < 0 ) {
578          Dmsg0(500,"my_ingres_query: neg.columns: no DML stmt!\n");
579       }
580       Dmsg0(500,"my_ingres_query (non SELECT) starting...\n");
581       /* non SELECT */
582       mdb->num_rows = INGexec(mdb->db, new_query, mdb->transaction);
583       if (INGcheck()) {
584         Dmsg0(500,"my_ingres_query (non SELECT) went wrong\n");
585         mdb->status = 1;
586       } else {
587         Dmsg0(500,"my_ingres_query (non SELECT) seems ok\n");
588         mdb->status = 0;
589       }
590    } else {
591       /* SELECT */
592       Dmsg0(500,"my_ingres_query (SELECT) starting...\n");
593       mdb->result = INGquery(mdb->db, new_query, mdb->transaction);
594       if (mdb->result != NULL) {
595         Dmsg1(500, "we have a result\n", new_query);
596
597         // how many fields in the set?
598         mdb->num_fields = (int)INGnfields(mdb->result);
599         Dmsg1(500, "we have %d fields\n", mdb->num_fields);
600
601         mdb->num_rows = INGntuples(mdb->result);
602         Dmsg1(500, "we have %d rows\n", mdb->num_rows);
603
604         mdb->status = 0;                  /* succeed */
605       } else {
606         Dmsg0(500, "No resultset...\n");
607         mdb->status = 1; /* failed */
608       }
609    }
610
611    Dmsg0(500, "my_ingres_query finishing\n");
612    return mdb->status;
613 }
614
615 void my_ingres_free_result(B_DB *mdb)
616 {
617    
618    db_lock(mdb);
619    if (mdb->result) {
620       INGclear(mdb->result);
621       mdb->result = NULL;
622    }
623
624    if (mdb->row) {
625       free(mdb->row);
626       mdb->row = NULL;
627    }
628
629    if (mdb->fields) {
630       free(mdb->fields);
631       mdb->fields = NULL;
632    }
633    db_unlock(mdb);
634 }
635
636 int my_ingres_currval(B_DB *mdb, const char *table_name)
637 {
638    /*
639     * Obtain the current value of the sequence that
640     * provides the serial value for primary key of the table.
641     *
642     * currval is local to our session. It is not affected by
643     * other transactions.
644     *
645     * Determine the name of the sequence.
646     * As we name all sequences as <table>_seq this is easy.
647     */
648
649    char sequence[64];
650    char query[256];
651    INGresult *result;
652    int id = 0;
653
654    bstrncpy(sequence, table_name, sizeof(sequence));
655    bstrncat(sequence, "_seq", sizeof(sequence));
656
657    bsnprintf(query, sizeof(query), "SELECT %s.currval FROM %s", sequence, table_name);
658
659    Dmsg1(500, "my_ingres_currval invoked with '%s'\n", query);
660
661    result = INGquery(mdb->db, query, mdb->transaction);
662
663    if (!result) {
664       Dmsg1(50, "Query failed: %s\n", query);
665       goto bail_out;
666    }
667
668    Dmsg0(500, "exec done");
669
670    id = atoi(INGgetvalue(result, 0, 0));
671
672 bail_out:
673    INGclear(result);
674
675    return id;
676 }
677
678 #ifdef HAVE_BATCH_FILE_INSERT
679 int my_ingres_batch_start(JCR *jcr, B_DB *mdb)
680 {
681    bool ok;
682
683    db_lock(mdb);
684    ok = db_sql_query(mdb,
685              "DECLARE GLOBAL TEMPORARY TABLE batch ("
686                 "FileIndex integer,"
687                 "JobId integer,"
688                 "Path varchar(256),"
689                 "Name varchar(256),"
690                 "LStat varchar(256),"
691                 "MD5 varchar(256))"
692                 " ON COMMIT PRESERVE ROWS WITH NORECOVERY",NULL, NULL);
693    db_unlock(mdb);
694    return ok;
695 }
696
697 int my_ingres_batch_end(JCR *jcr, B_DB *mdb, const char *error)
698 {
699    if (mdb) {
700       mdb->status = 0;
701    }
702    return true;
703 }
704
705 int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
706 {
707    size_t len;
708    const char *digest;
709    char ed1[50];
710
711    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
712    db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
713
714    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
715    db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
716
717    if (ar->Digest == NULL || ar->Digest[0] == 0) {
718       digest = "0";
719    } else {
720       digest = ar->Digest;
721    }
722
723    len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
724               ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
725               mdb->esc_name, ar->attr, digest);
726
727    return INSERT_DB(jcr, mdb, mdb->cmd);
728 }
729 #endif /* HAVE_BATCH_FILE_INSERT */
730
731 /*
732  * Escape strings so that Ingres is happy on COPY
733  *
734  *   NOTE! len is the length of the old string. Your new
735  *         string must be long enough (max 2*old+1) to hold
736  *         the escaped output.
737  */
738 char *my_ingres_copy_escape(char *dest, char *src, size_t len)
739 {
740    /* we have to escape \t, \n, \r, \ */
741    char c = '\0' ;
742
743    while (len > 0 && *src) {
744       switch (*src) {
745       case '\n':
746          c = 'n';
747          break;
748       case '\\':
749          c = '\\';
750          break;
751       case '\t':
752          c = 't';
753          break;
754       case '\r':
755          c = 'r';
756          break;
757       default:
758          c = '\0' ;
759       }
760
761       if (c) {
762          *dest = '\\';
763          dest++;
764          *dest = c;
765       } else {
766          *dest = *src;
767       }
768
769       len--;
770       src++;
771       dest++;
772    }
773
774    *dest = '\0';
775    return dest;
776 }
777
778 #ifdef HAVE_BATCH_FILE_INSERT
779 const char *my_ingres_batch_lock_path_query = "BEGIN";
780
781 const char *my_ingres_batch_lock_filename_query = "BEGIN";
782
783 const char *my_ingres_batch_unlock_tables_query = "COMMIT";
784
785 const char *my_ingres_batch_fill_path_query = 
786    "INSERT INTO Path (Path) "
787     "SELECT a.Path FROM "
788      "(SELECT DISTINCT Path FROM batch) AS a "
789       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
790
791
792 const char *my_ingres_batch_fill_filename_query = 
793    "INSERT INTO Filename (Name) "
794     "SELECT a.Name FROM "
795      "(SELECT DISTINCT Name FROM batch) as a "
796       "WHERE NOT EXISTS "
797        "(SELECT Name FROM Filename WHERE Name = a.Name)";
798 #endif /* HAVE_BATCH_FILE_INSERT */
799
800 #endif /* HAVE_INGRES */