]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
kes Implement --with-batch-insert in configure and detection of thread
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation plus additions
11    that are listed in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *
34  *    Kern Sibbald, March 2000
35  *
36  *    Version $Id$
37  */
38
39 /* The following is necessary so that we do not include
40  * the dummy external definition of B_DB.
41  */
42 #define __SQL_C                       /* indicate that this is sql.c */
43
44 #include "bacula.h"
45 #include "cats.h"
46
47 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL
48
49 uint32_t bacula_db_version = 0;
50
51 /* Forward referenced subroutines */
52 void print_dashes(B_DB *mdb);
53 void print_result(B_DB *mdb);
54
55 /*
56  * Called here to retrieve an integer from the database
57  */
58 static int int_handler(void *ctx, int num_fields, char **row)
59 {
60    uint32_t *val = (uint32_t *)ctx;
61
62    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
63
64    if (row[0]) {
65       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
66       *val = str_to_int64(row[0]);
67    } else {
68       Dmsg0(800, "int_handler finds zero\n");
69       *val = 0;
70    }
71    Dmsg0(800, "int_handler finishes\n");
72    return 0;
73 }
74
75 /*
76  * Called here to retrieve a 32/64 bit integer from the database.
77  *   The returned integer will be extended to 64 bit.
78  */
79 int db_int64_handler(void *ctx, int num_fields, char **row)
80 {
81    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
82
83    if (row[0]) {
84       lctx->value = str_to_int64(row[0]);
85       lctx->count++;
86    }
87    return 0;
88 }
89
90
91
92 /* NOTE!!! The following routines expect that the
93  *  calling subroutine sets and clears the mutex
94  */
95
96 /* Check that the tables correspond to the version we want */
97 bool check_tables_version(JCR *jcr, B_DB *mdb)
98 {
99    const char *query = "SELECT VersionId FROM Version";
100
101    bacula_db_version = 0;
102    if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
103       Mmsg(mdb->errmsg, "Database not created or server not running.\n");
104       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
105       return false;
106    }
107    if (bacula_db_version != BDB_VERSION) {
108       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
109           mdb->db_name, BDB_VERSION, bacula_db_version);
110       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
111       return false;
112    }
113    return true;
114 }
115
116 /* Utility routine for queries. The database MUST be locked before calling here. */
117 int
118 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
119 {
120    int status;
121
122    sql_free_result(mdb);
123    if ((status=sql_query(mdb, cmd)) != 0) {
124       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
125       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
126       if (verbose) {
127          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
128       }
129       return 0;
130    }
131
132    mdb->result = sql_store_result(mdb);
133
134    return mdb->result != NULL;
135 }
136
137 /*
138  * Utility routine to do inserts
139  * Returns: 0 on failure
140  *          1 on success
141  */
142 int
143 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
144 {
145    if (sql_query(mdb, cmd)) {
146       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
147       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
148       if (verbose) {
149          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
150       }
151       return 0;
152    }
153    if (mdb->have_insert_id) {
154       mdb->num_rows = sql_affected_rows(mdb);
155    } else {
156       mdb->num_rows = 1;
157    }
158    if (mdb->num_rows != 1) {
159       char ed1[30];
160       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
161          edit_uint64(mdb->num_rows, ed1));
162       if (verbose) {
163          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
164       }
165       return 0;
166    }
167    mdb->changes++;
168    return 1;
169 }
170
171 /* Utility routine for updates.
172  *  Returns: 0 on failure
173  *           1 on success
174  */
175 int
176 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
177 {
178
179    if (sql_query(mdb, cmd)) {
180       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
181       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
182       if (verbose) {
183          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
184       }
185       return 0;
186    }
187    mdb->num_rows = sql_affected_rows(mdb);
188    if (mdb->num_rows < 1) {
189       char ed1[30];
190       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
191          edit_uint64(mdb->num_rows, ed1), cmd);
192       if (verbose) {
193 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
194       }
195       return 0;
196    }
197    mdb->changes++;
198    return 1;
199 }
200
201 /* Utility routine for deletes
202  *
203  * Returns: -1 on error
204  *           n number of rows affected
205  */
206 int
207 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
208 {
209
210    if (sql_query(mdb, cmd)) {
211       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
212       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
213       if (verbose) {
214          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
215       }
216       return -1;
217    }
218    mdb->changes++;
219    return sql_affected_rows(mdb);
220 }
221
222
223 /*
224  * Get record max. Query is already in mdb->cmd
225  *  No locking done
226  *
227  * Returns: -1 on failure
228  *          count on success
229  */
230 int get_sql_record_max(JCR *jcr, B_DB *mdb)
231 {
232    SQL_ROW row;
233    int stat = 0;
234
235    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
236       if ((row = sql_fetch_row(mdb)) == NULL) {
237          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
238          stat = -1;
239       } else {
240          stat = str_to_int64(row[0]);
241       }
242       sql_free_result(mdb);
243    } else {
244       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
245       stat = -1;
246    }
247    return stat;
248 }
249
250 /*
251  * Return pre-edited error message
252  */
253 char *db_strerror(B_DB *mdb)
254 {
255    return mdb->errmsg;
256 }
257
258 /*
259  * Lock database, this can be called multiple times by the same
260  *   thread without blocking, but must be unlocked the number of
261  *   times it was locked.
262  */
263 void _db_lock(const char *file, int line, B_DB *mdb)
264 {
265    int errstat;
266    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
267       berrno be;
268       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
269            errstat, be.bstrerror(errstat));
270    }
271 }
272
273 /*
274  * Unlock the database. This can be called multiple times by the
275  *   same thread up to the number of times that thread called
276  *   db_lock()/
277  */
278 void _db_unlock(const char *file, int line, B_DB *mdb)
279 {
280    int errstat;
281    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
282       berrno be;
283       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
284            errstat, be.bstrerror(errstat));
285    }
286 }
287
288 /*
289  * Start a transaction. This groups inserts and makes things
290  *  much more efficient. Usually started when inserting
291  *  file attributes.
292  */
293 void db_start_transaction(JCR *jcr, B_DB *mdb)
294 {
295    if (!jcr->attr) {
296       jcr->attr = get_pool_memory(PM_FNAME);
297    }
298    if (!jcr->ar) {
299       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
300    }
301
302 #ifdef HAVE_SQLITE
303    if (!mdb->allow_transactions) {
304       return;
305    }
306    db_lock(mdb);
307    /* Allow only 10,000 changes per transaction */
308    if (mdb->transaction && mdb->changes > 10000) {
309       db_end_transaction(jcr, mdb);
310    }
311    if (!mdb->transaction) {
312       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
313       Dmsg0(400, "Start SQLite transaction\n");
314       mdb->transaction = 1;
315    }
316    db_unlock(mdb);
317 #endif
318
319 /*
320  * This is turned off because transactions break
321  * if multiple simultaneous jobs are run.
322  */
323 #ifdef HAVE_POSTGRESQL
324    if (!mdb->allow_transactions) {
325       return;
326    }
327    db_lock(mdb);
328    /* Allow only 25,000 changes per transaction */
329    if (mdb->transaction && mdb->changes > 25000) {
330       db_end_transaction(jcr, mdb);
331    }
332    if (!mdb->transaction) {
333       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
334       Dmsg0(400, "Start PosgreSQL transaction\n");
335       mdb->transaction = 1;
336    }
337    db_unlock(mdb);
338 #endif
339 }
340
341 void db_end_transaction(JCR *jcr, B_DB *mdb)
342 {
343    /*
344     * This can be called during thread cleanup and
345     *   the db may already be closed.  So simply return.
346     */
347    if (!mdb) {
348       return;
349    }
350
351    if (jcr && jcr->cached_attribute) {
352       Dmsg0(400, "Flush last cached attribute.\n");
353       if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
354          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
355       }
356       jcr->cached_attribute = false;
357    }
358
359 #ifdef HAVE_SQLITE
360    if (!mdb->allow_transactions) {
361       return;
362    }
363    db_lock(mdb);
364    if (mdb->transaction) {
365       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
366       mdb->transaction = 0;
367       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
368    }
369    mdb->changes = 0;
370    db_unlock(mdb);
371 #endif
372
373 #ifdef HAVE_POSTGRESQL
374    if (!mdb->allow_transactions) {
375       return;
376    }
377    db_lock(mdb);
378    if (mdb->transaction) {
379       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
380       mdb->transaction = 0;
381       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
382    }
383    mdb->changes = 0;
384    db_unlock(mdb);
385 #endif
386 }
387
388 /*
389  * Given a full filename, split it into its path
390  *  and filename parts. They are returned in pool memory
391  *  in the mdb structure.
392  */
393 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
394 {
395    const char *p, *f;
396
397    /* Find path without the filename.
398     * I.e. everything after the last / is a "filename".
399     * OK, maybe it is a directory name, but we treat it like
400     * a filename. If we don't find a / then the whole name
401     * must be a path name (e.g. c:).
402     */
403    for (p=f=fname; *p; p++) {
404       if (IsPathSeparator(*p)) {
405          f = p;                       /* set pos of last slash */
406       }
407    }
408    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
409       f++;                            /* yes, point to filename */
410    } else {                           /* no, whole thing must be path name */
411       f = p;
412    }
413
414    /* If filename doesn't exist (i.e. root directory), we
415     * simply create a blank name consisting of a single
416     * space. This makes handling zero length filenames
417     * easier.
418     */
419    mdb->fnl = p - f;
420    if (mdb->fnl > 0) {
421       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
422       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
423       mdb->fname[mdb->fnl] = 0;
424    } else {
425       mdb->fname[0] = 0;
426       mdb->fnl = 0;
427    }
428
429    mdb->pnl = f - fname;
430    if (mdb->pnl > 0) {
431       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
432       memcpy(mdb->path, fname, mdb->pnl);
433       mdb->path[mdb->pnl] = 0;
434    } else {
435       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
436       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
437       mdb->path[0] = 0;
438       mdb->pnl = 0;
439    }
440
441    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
442 }
443
444 /*
445  * List dashes as part of header for listing SQL results in a table
446  */
447 void
448 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
449 {
450    SQL_FIELD  *field;
451    int i, j;
452
453    sql_field_seek(mdb, 0);
454    send(ctx, "+");
455    for (i = 0; i < sql_num_fields(mdb); i++) {
456       field = sql_fetch_field(mdb);
457       for (j = 0; j < (int)field->max_length + 2; j++) {
458          send(ctx, "-");
459       }
460       send(ctx, "+");
461    }
462    send(ctx, "\n");
463 }
464
465 /*
466  * If full_list is set, we list vertically, otherwise, we
467  * list on one line horizontally.
468  */
469 void
470 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
471 {
472    SQL_FIELD *field;
473    SQL_ROW row;
474    int i, col_len, max_len = 0;
475    char buf[2000], ewc[30];
476
477    Dmsg0(800, "list_result starts\n");
478    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
479       send(ctx, _("No results to list.\n"));
480       return;
481    }
482
483    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
484    /* determine column display widths */
485    sql_field_seek(mdb, 0);
486    for (i = 0; i < sql_num_fields(mdb); i++) {
487       Dmsg1(800, "list_result processing field %d\n", i);
488       field = sql_fetch_field(mdb);
489       col_len = cstrlen(field->name);
490       if (type == VERT_LIST) {
491          if (col_len > max_len) {
492             max_len = col_len;
493          }
494       } else {
495          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
496             field->max_length += (field->max_length - 1) / 3;
497          }
498          if (col_len < (int)field->max_length) {
499             col_len = field->max_length;
500          }
501          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
502             col_len = 4;                 /* 4 = length of the word "NULL" */
503          }
504          field->max_length = col_len;    /* reset column info */
505       }
506    }
507
508    Dmsg0(800, "list_result finished first loop\n");
509    if (type == VERT_LIST) {
510       goto vertical_list;
511    }
512
513    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
514    list_dashes(mdb, send, ctx);
515    send(ctx, "|");
516    sql_field_seek(mdb, 0);
517    for (i = 0; i < sql_num_fields(mdb); i++) {
518       Dmsg1(800, "list_result looking at field %d\n", i);
519       field = sql_fetch_field(mdb);
520       bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
521       send(ctx, buf);
522    }
523    send(ctx, "\n");
524    list_dashes(mdb, send, ctx);
525
526    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
527    while ((row = sql_fetch_row(mdb)) != NULL) {
528       sql_field_seek(mdb, 0);
529       send(ctx, "|");
530       for (i = 0; i < sql_num_fields(mdb); i++) {
531          field = sql_fetch_field(mdb);
532          if (row[i] == NULL) {
533             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
534          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
535             bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
536                       add_commas(row[i], ewc));
537          } else {
538             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
539          }
540          send(ctx, buf);
541       }
542       send(ctx, "\n");
543    }
544    list_dashes(mdb, send, ctx);
545    return;
546
547 vertical_list:
548
549    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
550    while ((row = sql_fetch_row(mdb)) != NULL) {
551       sql_field_seek(mdb, 0);
552       for (i = 0; i < sql_num_fields(mdb); i++) {
553          field = sql_fetch_field(mdb);
554          if (row[i] == NULL) {
555             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
556          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
557             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
558                 add_commas(row[i], ewc));
559          } else {
560             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
561          }
562          send(ctx, buf);
563       }
564       send(ctx, "\n");
565    }
566    return;
567 }
568
569
570 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/