]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/ingres.c
Drop have_insert_id in mdb as its always true for all backends.
[bacula/bacula] / bacula / src / cats / ingres.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2003-2010 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database routines specific to Ingres
30  *   These are Ingres specific routines
31  *
32  *    Stefan Reddig, June 2009 with help of Marco van Wieringen April 2010
33  *    based uopn work done 
34  *    by Dan Langille, December 2003 and
35  *    by Kern Sibbald, March 2000
36  *
37  */
38
39
40 /* The following is necessary so that we do not include
41  * the dummy external definition of DB.
42  */
43 #define __SQL_C                       /* indicate that this is sql.c */
44
45 #include "bacula.h"
46 #include "cats.h"
47
48 #ifdef HAVE_INGRES
49
50 #include "myingres.h"
51
52 /* -----------------------------------------------------------------------
53  *
54  *   Ingres dependent defines and subroutines
55  *
56  * -----------------------------------------------------------------------
57  */
58
59 /* List of open databases */
60 static BQUEUE db_list = {&db_list, &db_list};
61
62 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
63
64 /*
65  * Retrieve database type
66  */
67 const char *
68 db_get_type(void)
69 {
70    return "Ingres";
71 }
72
73 /*
74  * Initialize database data structure. In principal this should
75  * never have errors, or it is really fatal.
76  */
77 B_DB *
78 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
79                  const char *db_address, int db_port, const char *db_socket,
80                  int mult_db_connections)
81 {
82    B_DB *mdb;
83    int next_session_id = 0;
84
85    if (!db_user) {
86       Jmsg(jcr, M_FATAL, 0, _("A user name for Ingres must be supplied.\n"));
87       return NULL;
88    }
89    P(mutex);                          /* lock DB queue */
90    if (!mult_db_connections) {
91       /* Look to see if DB already open */
92       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
93          if (bstrcmp(mdb->db_name, db_name) &&
94              bstrcmp(mdb->db_address, db_address) &&
95              mdb->db_port == db_port) {
96             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
97             mdb->ref_count++;
98             V(mutex);
99             return mdb;                  /* already open */
100          }
101
102          if (mdb->session_id > next_session_id) {
103             next_session_id = mdb->session_id;
104          }
105       }
106    } else {
107       /*
108        * See what the next available session_id is.
109        * We first see what the highest session_id is used now.
110        */
111       foreach_dlist(mdb, db_list) {
112          if (mdb->session_id > next_session_id) {
113             next_session_id = mdb->session_id;
114          }
115       }
116    }
117    Dmsg0(100, "db_open first time\n");
118    mdb = (B_DB *)malloc(sizeof(B_DB));
119    memset(mdb, 0, sizeof(B_DB));
120    mdb->db_name = bstrdup(db_name);
121    mdb->db_user = bstrdup(db_user);
122    if (db_password) {
123       mdb->db_password = bstrdup(db_password);
124    }
125    if (db_address) {
126       mdb->db_address  = bstrdup(db_address);
127    }
128    if (db_socket) {
129       mdb->db_socket   = bstrdup(db_socket);
130    }
131    mdb->db_port        = db_port;
132    mdb->session_id     = ++next_session_id;
133    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
134    *mdb->errmsg        = 0;
135    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
136    mdb->cached_path    = get_pool_memory(PM_FNAME);
137    mdb->cached_path_id = 0;
138    mdb->ref_count      = 1;
139    mdb->fname          = get_pool_memory(PM_FNAME);
140    mdb->path           = get_pool_memory(PM_FNAME);
141    mdb->esc_name       = get_pool_memory(PM_FNAME);
142    mdb->esc_path      = get_pool_memory(PM_FNAME);
143    mdb->allow_transactions = mult_db_connections;
144    mdb->limit_filter = new_bregexp("/LIMIT ([0-9]+)/FETCH FIRST $1 ROW ONLY/g");
145    qinsert(&db_list, &mdb->bq);            /* put db in list */
146    V(mutex);
147    return mdb;
148 }
149
150 /* Check that the database correspond to the encoding we want */
151 static bool check_database_encoding(JCR *jcr, B_DB *mdb)
152 {
153 /* SRE: TODO! Needed?
154  SQL_ROW row;
155    int ret=false;
156
157    if (!db_sql_query(mdb, "SELECT getdatabaseencoding()", NULL, NULL)) {
158       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
159       return false;
160    }
161
162    if ((row = sql_fetch_row(mdb)) == NULL) {
163       Mmsg1(mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
164       Jmsg(jcr, M_ERROR, 0, "Can't check database encoding %s", mdb->errmsg);
165    } else {
166       ret = bstrcmp(row[0], "SQL_ASCII");
167       if (!ret) {
168          Mmsg(mdb->errmsg, 
169               _("Encoding error for database \"%s\". Wanted SQL_ASCII, got %s\n"),
170               mdb->db_name, row[0]);
171          Jmsg(jcr, M_WARNING, 0, "%s", mdb->errmsg);
172          Dmsg1(50, "%s", mdb->errmsg);
173       } 
174    }
175    return ret;
176 */
177     return true;
178 }
179
180 /*
181  * Now actually open the database.  This can generate errors,
182  *   which are returned in the errmsg
183  *
184  * DO NOT close the database or free(mdb) here !!!!
185  */
186 int
187 db_open_database(JCR *jcr, B_DB *mdb)
188 {
189    int errstat;
190    char buf[10], *port;
191
192    P(mutex);
193    if (mdb->connected) {
194       V(mutex);
195       return 1;
196    }
197    mdb->connected = false;
198
199    if ((errstat=rwl_init(&mdb->lock)) != 0) {
200       berrno be;
201       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
202             be.bstrerror(errstat));
203       V(mutex);
204       return 0;
205    }
206
207    if (mdb->db_port) {
208       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
209       port = buf;
210    } else {
211       port = NULL;
212    }
213
214    mdb->db = INGconnectDB(mdb->db_name, mdb->db_user, mdb->db_password, mdb->session_id);
215
216    Dmsg0(50, "Ingres real CONNECT done\n");
217    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
218             mdb->db_password==NULL?"(NULL)":mdb->db_password);
219
220    if (!mdb->db) {
221       Mmsg2(&mdb->errmsg, _("Unable to connect to Ingres server.\n"
222             "Database=%s User=%s\n"
223             "It is probably not running or your password is incorrect.\n"),
224              mdb->db_name, mdb->db_user);
225       V(mutex);
226       return 0;
227    }
228
229    mdb->connected = true;
230
231    if (!check_tables_version(jcr, mdb)) {
232       V(mutex);
233       return 0;
234    }
235
236    //sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
237    
238    /* check that encoding is SQL_ASCII */
239    check_database_encoding(jcr, mdb);
240
241    V(mutex);
242    return 1;
243 }
244
245 void
246 db_close_database(JCR *jcr, B_DB *mdb)
247 {
248    if (!mdb) {
249       return;
250    }
251    db_end_transaction(jcr, mdb);
252    P(mutex);
253    sql_free_result(mdb);
254    mdb->ref_count--;
255    if (mdb->ref_count == 0) {
256       qdchain(&mdb->bq);
257       if (mdb->connected && mdb->db) {
258          sql_close(mdb);
259       }
260       rwl_destroy(&mdb->lock);
261       free_pool_memory(mdb->errmsg);
262       free_pool_memory(mdb->cmd);
263       free_pool_memory(mdb->cached_path);
264       free_pool_memory(mdb->fname);
265       free_pool_memory(mdb->path);
266       free_pool_memory(mdb->esc_name);
267       free_pool_memory(mdb->esc_path);
268       free_bregexp(mdb->limit_filter);
269       if (mdb->db_name) {
270          free(mdb->db_name);
271       }
272       if (mdb->db_user) {
273          free(mdb->db_user);
274       }
275       if (mdb->db_password) {
276          free(mdb->db_password);
277       }
278       if (mdb->db_address) {
279          free(mdb->db_address);
280       }
281       if (mdb->db_socket) {
282          free(mdb->db_socket);
283       }
284       free(mdb);
285    }
286    V(mutex);
287 }
288
289 void db_check_backend_thread_safe()
290 { }
291
292
293
294 void db_thread_cleanup()
295 { }
296
297 /*
298  * Return the next unique index (auto-increment) for
299  * the given table.  Return NULL on error.
300  *
301  * For Ingres, NULL causes the auto-increment value     SRE: true?
302  *  to be updated.
303  */
304 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
305 {
306    strcpy(index, "NULL");
307    return 1;
308 }
309
310
311 /*
312  * Escape strings so that Ingres is happy
313  *
314  *   NOTE! len is the length of the old string. Your new
315  *         string must be long enough (max 2*old+1) to hold
316  *         the escaped output.
317  */
318 void
319 db_escape_string(JCR *jcr, B_DB *mdb, char *snew, char *old, int len)
320 {
321    char *n, *o;
322
323    n = snew;
324    o = old;
325    while (len--) {
326       switch (*o) {
327       case '\'':
328          *n++ = '\'';
329          *n++ = '\'';
330          o++;
331          break;
332       case 0:
333          *n++ = '\\';
334          *n++ = 0;
335          o++;
336          break;
337       default:
338          *n++ = *o++;
339          break;
340       }
341    }
342    *n = 0;
343 }
344
345 /*
346  * Submit a general SQL command (cmd), and for each row returned,
347  *  the sqlite_handler is called with the ctx.
348  */
349 bool db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
350 {
351    SQL_ROW row;
352
353    Dmsg0(500, "db_sql_query started\n");
354
355    db_lock(mdb);
356    if (sql_query(mdb, query) != 0) {
357       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
358       db_unlock(mdb);
359       Dmsg0(500, "db_sql_query failed\n");
360       return false;
361    }
362    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
363
364    if (result_handler != NULL) {
365       Dmsg0(500, "db_sql_query invoking handler\n");
366       if (mdb->result != NULL) {
367          int num_fields = sql_num_fields(mdb);
368
369          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
370          while ((row = sql_fetch_row(mdb)) != NULL) {
371
372             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
373             if (result_handler(ctx, num_fields, row))
374                break;
375          }
376
377         sql_free_result(mdb);
378       }
379    }
380    db_unlock(mdb);
381
382    Dmsg0(500, "db_sql_query finished\n");
383
384    return true;
385 }
386
387 /*
388  * Close database connection
389  */
390 void my_ingres_close(B_DB *mdb)
391 {
392    Dmsg0(500, "my_ingres_close closing database connection\n");
393    INGdisconnectDB(mdb->db);
394    //SRE: error handling? 
395 }
396
397 INGRES_ROW my_ingres_fetch_row(B_DB *mdb)
398 {
399    int j;
400    INGRES_ROW row = NULL; // by default, return NULL
401
402    if (!mdb->result) {
403       return row;
404    }
405    if (mdb->result->num_rows <= 0) {
406       return row;
407    }
408
409    Dmsg0(500, "my_ingres_fetch_row start\n");
410
411    if (!mdb->row || mdb->row_size < mdb->num_fields) {
412       int num_fields = mdb->num_fields;
413       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
414
415       if (mdb->row) {
416          Dmsg0(500, "my_ingres_fetch_row freeing space\n");
417          free(mdb->row);
418       }
419       num_fields += 20;                  /* add a bit extra */
420       mdb->row = (INGRES_ROW)malloc(sizeof(char *) * num_fields);
421       mdb->row_size = num_fields;
422
423       // now reset the row_number now that we have the space allocated
424       //mdb->row_number = 1;
425       mdb->row_number = 0;
426    }
427
428    // if still within the result set
429    //if (mdb->row_number <= mdb->num_rows) {
430    if (mdb->row_number < mdb->num_rows) {
431       Dmsg2(500, "my_ingres_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
432       // get each value from this row
433       for (j = 0; j < mdb->num_fields; j++) {
434          mdb->row[j] = INGgetvalue(mdb->result, mdb->row_number, j);
435          Dmsg2(500, "my_ingres_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
436       }
437       // increment the row number for the next call
438       mdb->row_number++;
439
440       row = mdb->row;
441    } else {
442       Dmsg2(500, "my_ingres_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
443    }
444
445    Dmsg1(500, "my_ingres_fetch_row finishes returning %p\n", row);
446
447    return row;
448 }
449
450
451 int my_ingres_max_length(B_DB *mdb, int field_num) {
452    //
453    // for a given column, find the max length
454    //
455    int max_length;
456    int i;
457    int this_length;
458
459    max_length = 0;
460    for (i = 0; i < mdb->num_rows; i++) {
461       if (INGgetisnull(mdb->result, i, field_num)) {
462           this_length = 4;        // "NULL"
463       } else {
464           this_length = cstrlen(INGgetvalue(mdb->result, i, field_num));
465       }
466
467       if (max_length < this_length) {
468           max_length = this_length;
469       }
470    }
471
472    return max_length;
473 }
474
475 INGRES_FIELD * my_ingres_fetch_field(B_DB *mdb)
476 {
477    int i;
478
479    Dmsg0(500, "my_ingres_fetch_field starts\n");
480
481    if (!mdb->fields || mdb->fields_size < mdb->num_fields) {
482       if (mdb->fields) {
483          free(mdb->fields);
484       }
485       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
486       mdb->fields = (INGRES_FIELD *)malloc(sizeof(INGRES_FIELD) * mdb->num_fields);
487       mdb->fields_size = mdb->num_fields;
488
489       for (i = 0; i < mdb->num_fields; i++) {
490          Dmsg1(500, "filling field %d\n", i);
491          strcpy(mdb->fields[i].name,INGfname(mdb->result, i));
492          mdb->fields[i].max_length = my_ingres_max_length(mdb, i);
493          mdb->fields[i].type       = INGftype(mdb->result, i);
494          mdb->fields[i].flags      = 0;
495
496          Dmsg4(500, "my_ingres_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
497             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
498             mdb->fields[i].flags);
499       } // end for
500    } // end if
501
502    // increment field number for the next time around
503
504    Dmsg0(500, "my_ingres_fetch_field finishes\n");
505    return &mdb->fields[mdb->field_number++];
506 }
507
508 void my_ingres_data_seek(B_DB *mdb, int row)
509 {
510    // set the row number to be returned on the next call
511    // to my_ingres_fetch_row
512    mdb->row_number = row;
513 }
514
515 void my_ingres_field_seek(B_DB *mdb, int field)
516 {
517    mdb->field_number = field;
518 }
519
520 /*
521  * Note, if this routine returns 1 (failure), Bacula expects
522  *  that no result has been stored.
523  *
524  *  Returns:  0  on success
525  *            1  on failure
526  *
527  */
528 int my_ingres_query(B_DB *mdb, const char *query)
529 {
530    const char *new_query;
531
532    if (strstr(query, "LIMIT") != NULL) {
533       new_query = mdb->limit_filter->replace(query);
534    } else {
535       new_query = query;
536    }
537
538    Dmsg0(500, "my_ingres_query started\n");
539    // We are starting a new query.  reset everything.
540    mdb->num_rows     = -1;
541    mdb->row_number   = -1;
542    mdb->field_number = -1;
543
544    int cols = -1;
545
546    if (mdb->result) {
547       INGclear(mdb->result);  /* hmm, someone forgot to free?? */
548       mdb->result = NULL;
549    }
550
551    Dmsg1(500, "my_ingres_query starts with '%s'\n", new_query);
552
553    /*
554     * See if we are getting a transaction start or end.
555     */
556    if (!strcasecmp(new_query, "BEGIN")) {
557       /*
558        * Start of a transaction.
559        */
560       Dmsg0(500,"my_ingres_query: Start of transaction\n");
561       mdb->transaction = true;
562       return 0;
563    } else if (!strcasecmp(new_query, "COMMIT")) {
564       /*
565        * End of a transaction.
566        */
567       Dmsg0(500,"my_ingres_query: End of transaction, commiting work\n");
568       mdb->transaction = false;
569       INGcommit(mdb->db);
570       return 0;
571    }
572
573    /* TODO: differentiate between SELECTs and other queries */
574
575    if ((cols = INGgetCols(mdb->db, new_query, mdb->transaction)) <= 0) {
576       if (cols < 0 ) {
577          Dmsg0(500,"my_ingres_query: neg.columns: no DML stmt!\n");
578       }
579       Dmsg0(500,"my_ingres_query (non SELECT) starting...\n");
580       /* non SELECT */
581       mdb->num_rows = INGexec(mdb->db, new_query, mdb->transaction);
582       if (INGcheck()) {
583         Dmsg0(500,"my_ingres_query (non SELECT) went wrong\n");
584         mdb->status = 1;
585       } else {
586         Dmsg0(500,"my_ingres_query (non SELECT) seems ok\n");
587         mdb->status = 0;
588       }
589    } else {
590       /* SELECT */
591       Dmsg0(500,"my_ingres_query (SELECT) starting...\n");
592       mdb->result = INGquery(mdb->db, new_query, mdb->transaction);
593       if (mdb->result != NULL) {
594         Dmsg1(500, "we have a result\n", new_query);
595
596         // how many fields in the set?
597         mdb->num_fields = (int)INGnfields(mdb->result);
598         Dmsg1(500, "we have %d fields\n", mdb->num_fields);
599
600         mdb->num_rows = INGntuples(mdb->result);
601         Dmsg1(500, "we have %d rows\n", mdb->num_rows);
602
603         mdb->status = 0;                  /* succeed */
604       } else {
605         Dmsg0(500, "No resultset...\n");
606         mdb->status = 1; /* failed */
607       }
608    }
609
610    Dmsg0(500, "my_ingres_query finishing\n");
611    return mdb->status;
612 }
613
614 void my_ingres_free_result(B_DB *mdb)
615 {
616    
617    db_lock(mdb);
618    if (mdb->result) {
619       INGclear(mdb->result);
620       mdb->result = NULL;
621    }
622
623    if (mdb->row) {
624       free(mdb->row);
625       mdb->row = NULL;
626    }
627
628    if (mdb->fields) {
629       free(mdb->fields);
630       mdb->fields = NULL;
631    }
632    db_unlock(mdb);
633 }
634
635 int my_ingres_currval(B_DB *mdb, const char *table_name)
636 {
637    /*
638     * Obtain the current value of the sequence that
639     * provides the serial value for primary key of the table.
640     *
641     * currval is local to our session. It is not affected by
642     * other transactions.
643     *
644     * Determine the name of the sequence.
645     * As we name all sequences as <table>_seq this is easy.
646     */
647
648    char sequence[64];
649    char query[256];
650    INGresult *result;
651    int id = 0;
652
653    bstrncpy(sequence, table_name, sizeof(sequence));
654    bstrncat(sequence, "_seq", sizeof(sequence));
655
656    bsnprintf(query, sizeof(query), "SELECT %s.currval FROM %s", sequence, table_name);
657
658    Dmsg1(500, "my_ingres_currval invoked with '%s'\n", query);
659
660    result = INGquery(mdb->db, query, mdb->transaction);
661
662    if (!result) {
663       Dmsg1(50, "Query failed: %s\n", query);
664       goto bail_out;
665    }
666
667    Dmsg0(500, "exec done");
668
669    id = atoi(INGgetvalue(result, 0, 0));
670
671 bail_out:
672    INGclear(result);
673
674    return id;
675 }
676
677 #ifdef HAVE_BATCH_FILE_INSERT
678 int my_ingres_batch_start(JCR *jcr, B_DB *mdb)
679 {
680    bool ok;
681
682    db_lock(mdb);
683    ok = db_sql_query(mdb,
684              "DECLARE GLOBAL TEMPORARY TABLE batch ("
685                 "FileIndex integer,"
686                 "JobId integer,"
687                 "Path varchar(256),"
688                 "Name varchar(256),"
689                 "LStat varchar(256),"
690                 "MD5 varchar(256))"
691                 " ON COMMIT PRESERVE ROWS WITH NORECOVERY",NULL, NULL);
692    db_unlock(mdb);
693    return ok;
694 }
695
696 int my_ingres_batch_end(JCR *jcr, B_DB *mdb, const char *error)
697 {
698    if (mdb) {
699       mdb->status = 0;
700    }
701    return true;
702 }
703
704 int my_ingres_batch_insert(JCR *jcr, B_DB *mdb, ATTR_DBR *ar)
705 {
706    size_t len;
707    const char *digest;
708    char ed1[50];
709
710    mdb->esc_name = check_pool_memory_size(mdb->esc_name, mdb->fnl*2+1);
711    db_escape_string(jcr, mdb, mdb->esc_name, mdb->fname, mdb->fnl);
712
713    mdb->esc_path = check_pool_memory_size(mdb->esc_path, mdb->pnl*2+1);
714    db_escape_string(jcr, mdb, mdb->esc_path, mdb->path, mdb->pnl);
715
716    if (ar->Digest == NULL || ar->Digest[0] == 0) {
717       digest = "0";
718    } else {
719       digest = ar->Digest;
720    }
721
722    len = Mmsg(mdb->cmd, "INSERT INTO batch VALUES (%u,%s,'%s','%s','%s','%s')",
723               ar->FileIndex, edit_int64(ar->JobId,ed1), mdb->esc_path,
724               mdb->esc_name, ar->attr, digest);
725
726    return INSERT_DB(jcr, mdb, mdb->cmd);
727 }
728 #endif /* HAVE_BATCH_FILE_INSERT */
729
730 /*
731  * Escape strings so that Ingres is happy on COPY
732  *
733  *   NOTE! len is the length of the old string. Your new
734  *         string must be long enough (max 2*old+1) to hold
735  *         the escaped output.
736  */
737 char *my_ingres_copy_escape(char *dest, char *src, size_t len)
738 {
739    /* we have to escape \t, \n, \r, \ */
740    char c = '\0' ;
741
742    while (len > 0 && *src) {
743       switch (*src) {
744       case '\n':
745          c = 'n';
746          break;
747       case '\\':
748          c = '\\';
749          break;
750       case '\t':
751          c = 't';
752          break;
753       case '\r':
754          c = 'r';
755          break;
756       default:
757          c = '\0' ;
758       }
759
760       if (c) {
761          *dest = '\\';
762          dest++;
763          *dest = c;
764       } else {
765          *dest = *src;
766       }
767
768       len--;
769       src++;
770       dest++;
771    }
772
773    *dest = '\0';
774    return dest;
775 }
776
777 #ifdef HAVE_BATCH_FILE_INSERT
778 const char *my_ingres_batch_lock_path_query = "BEGIN";
779
780 const char *my_ingres_batch_lock_filename_query = "BEGIN";
781
782 const char *my_ingres_batch_unlock_tables_query = "COMMIT";
783
784 const char *my_ingres_batch_fill_path_query = 
785    "INSERT INTO Path (Path) "
786     "SELECT a.Path FROM "
787      "(SELECT DISTINCT Path FROM batch) AS a "
788       "WHERE NOT EXISTS (SELECT Path FROM Path WHERE Path = a.Path) ";
789
790
791 const char *my_ingres_batch_fill_filename_query = 
792    "INSERT INTO Filename (Name) "
793     "SELECT a.Name FROM "
794      "(SELECT DISTINCT Name FROM batch) as a "
795       "WHERE NOT EXISTS "
796        "(SELECT Name FROM Filename WHERE Name = a.Name)";
797 #endif /* HAVE_BATCH_FILE_INSERT */
798
799 #endif /* HAVE_INGRES */