]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/sql.c
ebl Use a dedicate DB link to compute and send the accurate list
[bacula/bacula] / bacula / src / cats / sql.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  * Bacula Catalog Database interface routines
30  *
31  *     Almost generic set of SQL database interface routines
32  *      (with a little more work)
33  *     SQL engine specific routines are in mysql.c, postgresql.c,
34  *       sqlite.c, ...
35  *
36  *    Kern Sibbald, March 2000
37  *
38  *    Version $Id$
39  */
40
41 /* The following is necessary so that we do not include
42  * the dummy external definition of B_DB.
43  */
44 #define __SQL_C                       /* indicate that this is sql.c */
45
46 #include "bacula.h"
47 #include "cats.h"
48
49 #if    HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL || HAVE_DBI
50
51 uint32_t bacula_db_version = 0;
52
53 int db_type = -1;
54
55 /* Forward referenced subroutines */
56 void print_dashes(B_DB *mdb);
57 void print_result(B_DB *mdb);
58
59 B_DB *db_init(JCR *jcr, const char *db_driver, const char *db_name, const char *db_user, 
60               const char *db_password, const char *db_address, int db_port, 
61               const char *db_socket, int mult_db_connections)
62 {              
63 #ifdef HAVE_DBI
64    char *p;
65    if (!db_driver) {
66       Jmsg0(jcr, M_ABORT, 0, _("Driver type not specified in Catalog resource.\n"));
67    }
68    if (strlen(db_driver) < 5 || db_driver[3] != ':' || strncasecmp(db_driver, "dbi", 3) != 0) {
69       Jmsg0(jcr, M_ABORT, 0, _("Invalid driver type, must be \"dbi:<type>\"\n"));
70    }
71    p = (char *)(db_driver + 4);      
72    if (strcasecmp(p, "mysql") == 0) {
73       db_type = SQL_TYPE_MYSQL;
74    } else if (strcasecmp(p, "postgresql") == 0) {
75       db_type = SQL_TYPE_POSTGRESQL;
76    } else if (strcasecmp(p, "sqlite") == 0) {
77       db_type = SQL_TYPE_SQLITE;
78    } else {
79       Jmsg1(jcr, M_ABORT, 0, _("Unknown database type: %s\n"), p);
80    }
81 #elif HAVE_MYSQL
82    db_type = SQL_TYPE_MYSQL;
83 #elif HAVE_POSTGRESQL
84    db_type = SQL_TYPE_POSTGRESQL;
85 #elif HAVE_SQLITE
86    db_type = SQL_TYPE_SQLITE;
87 #elif HAVE_SQLITE3
88    db_type = SQL_TYPE_SQLITE;
89 #endif
90
91    return db_init_database(jcr, db_name, db_user, db_password, db_address,
92              db_port, db_socket, mult_db_connections);
93 }
94
95 dbid_list::dbid_list() 
96 {
97    memset(this, 0, sizeof(dbid_list));
98    max_ids = 1000;
99    DBId = (DBId_t *)malloc(max_ids * sizeof(DBId_t));
100    num_ids = num_seen = tot_ids = 0;
101    PurgedFiles = NULL;
102 }
103
104 dbid_list::~dbid_list() 
105
106    free(DBId);
107 }
108
109
110 /*
111  * Called here to retrieve an integer from the database
112  */
113 static int int_handler(void *ctx, int num_fields, char **row)
114 {
115    uint32_t *val = (uint32_t *)ctx;
116
117    Dmsg1(800, "int_handler starts with row pointing at %x\n", row);
118
119    if (row[0]) {
120       Dmsg1(800, "int_handler finds '%s'\n", row[0]);
121       *val = str_to_int64(row[0]);
122    } else {
123       Dmsg0(800, "int_handler finds zero\n");
124       *val = 0;
125    }
126    Dmsg0(800, "int_handler finishes\n");
127    return 0;
128 }
129
130 /*
131  * Called here to retrieve a 32/64 bit integer from the database.
132  *   The returned integer will be extended to 64 bit.
133  */
134 int db_int64_handler(void *ctx, int num_fields, char **row)
135 {
136    db_int64_ctx *lctx = (db_int64_ctx *)ctx;
137
138    if (row[0]) {
139       lctx->value = str_to_int64(row[0]);
140       lctx->count++;
141    }
142    return 0;
143 }
144
145
146
147 /* NOTE!!! The following routines expect that the
148  *  calling subroutine sets and clears the mutex
149  */
150
151 /* Check that the tables correspond to the version we want */
152 bool check_tables_version(JCR *jcr, B_DB *mdb)
153 {
154    const char *query = "SELECT VersionId FROM Version";
155
156    bacula_db_version = 0;
157    if (!db_sql_query(mdb, query, int_handler, (void *)&bacula_db_version)) {
158       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
159       return false;
160    }
161    if (bacula_db_version != BDB_VERSION) {
162       Mmsg(mdb->errmsg, "Version error for database \"%s\". Wanted %d, got %d\n",
163           mdb->db_name, BDB_VERSION, bacula_db_version);
164       Jmsg(jcr, M_FATAL, 0, "%s", mdb->errmsg);
165       return false;
166    }
167    return true;
168 }
169
170 /* Utility routine for queries. The database MUST be locked before calling here. */
171 int
172 QueryDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
173 {
174    int status;
175
176    sql_free_result(mdb);
177    if ((status=sql_query(mdb, cmd)) != 0) {
178       m_msg(file, line, &mdb->errmsg, _("query %s failed:\n%s\n"), cmd, sql_strerror(mdb));
179       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
180       if (verbose) {
181          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
182       }
183       return 0;
184    }
185
186    mdb->result = sql_store_result(mdb);
187
188    return mdb->result != NULL;
189 }
190
191 /*
192  * Utility routine to do inserts
193  * Returns: 0 on failure
194  *          1 on success
195  */
196 int
197 InsertDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
198 {
199    if (sql_query(mdb, cmd)) {
200       m_msg(file, line, &mdb->errmsg,  _("insert %s failed:\n%s\n"), cmd, sql_strerror(mdb));
201       j_msg(file, line, jcr, M_FATAL, 0, "%s", mdb->errmsg);
202       if (verbose) {
203          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
204       }
205       return 0;
206    }
207    if (mdb->have_insert_id) {
208       mdb->num_rows = sql_affected_rows(mdb);
209    } else {
210       mdb->num_rows = 1;
211    }
212    if (mdb->num_rows != 1) {
213       char ed1[30];
214       m_msg(file, line, &mdb->errmsg, _("Insertion problem: affected_rows=%s\n"),
215          edit_uint64(mdb->num_rows, ed1));
216       if (verbose) {
217          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
218       }
219       return 0;
220    }
221    mdb->changes++;
222    return 1;
223 }
224
225 /* Utility routine for updates.
226  *  Returns: 0 on failure
227  *           1 on success
228  */
229 int
230 UpdateDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
231 {
232
233    if (sql_query(mdb, cmd)) {
234       m_msg(file, line, &mdb->errmsg, _("update %s failed:\n%s\n"), cmd, sql_strerror(mdb));
235       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
236       if (verbose) {
237          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
238       }
239       return 0;
240    }
241    mdb->num_rows = sql_affected_rows(mdb);
242    if (mdb->num_rows < 1) {
243       char ed1[30];
244       m_msg(file, line, &mdb->errmsg, _("Update failed: affected_rows=%s for %s\n"),
245          edit_uint64(mdb->num_rows, ed1), cmd);
246       if (verbose) {
247 //       j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
248       }
249       return 0;
250    }
251    mdb->changes++;
252    return 1;
253 }
254
255 /* Utility routine for deletes
256  *
257  * Returns: -1 on error
258  *           n number of rows affected
259  */
260 int
261 DeleteDB(const char *file, int line, JCR *jcr, B_DB *mdb, char *cmd)
262 {
263
264    if (sql_query(mdb, cmd)) {
265       m_msg(file, line, &mdb->errmsg, _("delete %s failed:\n%s\n"), cmd, sql_strerror(mdb));
266       j_msg(file, line, jcr, M_ERROR, 0, "%s", mdb->errmsg);
267       if (verbose) {
268          j_msg(file, line, jcr, M_INFO, 0, "%s\n", cmd);
269       }
270       return -1;
271    }
272    mdb->changes++;
273    return sql_affected_rows(mdb);
274 }
275
276
277 /*
278  * Get record max. Query is already in mdb->cmd
279  *  No locking done
280  *
281  * Returns: -1 on failure
282  *          count on success
283  */
284 int get_sql_record_max(JCR *jcr, B_DB *mdb)
285 {
286    SQL_ROW row;
287    int stat = 0;
288
289    if (QUERY_DB(jcr, mdb, mdb->cmd)) {
290       if ((row = sql_fetch_row(mdb)) == NULL) {
291          Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
292          stat = -1;
293       } else {
294          stat = str_to_int64(row[0]);
295       }
296       sql_free_result(mdb);
297    } else {
298       Mmsg1(&mdb->errmsg, _("error fetching row: %s\n"), sql_strerror(mdb));
299       stat = -1;
300    }
301    return stat;
302 }
303
304 /*
305  * Return pre-edited error message
306  */
307 char *db_strerror(B_DB *mdb)
308 {
309    return mdb->errmsg;
310 }
311
312 /*
313  * Lock database, this can be called multiple times by the same
314  *   thread without blocking, but must be unlocked the number of
315  *   times it was locked.
316  */
317 void _db_lock(const char *file, int line, B_DB *mdb)
318 {
319    int errstat;
320    if ((errstat=rwl_writelock(&mdb->lock)) != 0) {
321       berrno be;
322       e_msg(file, line, M_FATAL, 0, "rwl_writelock failure. stat=%d: ERR=%s\n",
323            errstat, be.bstrerror(errstat));
324    }
325 }
326
327 /*
328  * Unlock the database. This can be called multiple times by the
329  *   same thread up to the number of times that thread called
330  *   db_lock()/
331  */
332 void _db_unlock(const char *file, int line, B_DB *mdb)
333 {
334    int errstat;
335    if ((errstat=rwl_writeunlock(&mdb->lock)) != 0) {
336       berrno be;
337       e_msg(file, line, M_FATAL, 0, "rwl_writeunlock failure. stat=%d: ERR=%s\n",
338            errstat, be.bstrerror(errstat));
339    }
340 }
341
342 /*
343  * Start a transaction. This groups inserts and makes things
344  *  much more efficient. Usually started when inserting
345  *  file attributes.
346  */
347 void db_start_transaction(JCR *jcr, B_DB *mdb)
348 {
349    if (!jcr->attr) {
350       jcr->attr = get_pool_memory(PM_FNAME);
351    }
352    if (!jcr->ar) {
353       jcr->ar = (ATTR_DBR *)malloc(sizeof(ATTR_DBR));
354    }
355
356 #ifdef HAVE_SQLITE
357    if (!mdb->allow_transactions) {
358       return;
359    }
360    db_lock(mdb);
361    /* Allow only 10,000 changes per transaction */
362    if (mdb->transaction && mdb->changes > 10000) {
363       db_end_transaction(jcr, mdb);
364    }
365    if (!mdb->transaction) {
366       my_sqlite_query(mdb, "BEGIN");  /* begin transaction */
367       Dmsg0(400, "Start SQLite transaction\n");
368       mdb->transaction = 1;
369    }
370    db_unlock(mdb);
371 #endif
372
373 /*
374  * This is turned off because transactions break
375  * if multiple simultaneous jobs are run.
376  */
377 #ifdef HAVE_POSTGRESQL
378    if (!mdb->allow_transactions) {
379       return;
380    }
381    db_lock(mdb);
382    /* Allow only 25,000 changes per transaction */
383    if (mdb->transaction && mdb->changes > 25000) {
384       db_end_transaction(jcr, mdb);
385    }
386    if (!mdb->transaction) {
387       db_sql_query(mdb, "BEGIN", NULL, NULL);  /* begin transaction */
388       Dmsg0(400, "Start PosgreSQL transaction\n");
389       mdb->transaction = 1;
390    }
391    db_unlock(mdb);
392 #endif
393 }
394
395 void db_end_transaction(JCR *jcr, B_DB *mdb)
396 {
397    /*
398     * This can be called during thread cleanup and
399     *   the db may already be closed.  So simply return.
400     */
401    if (!mdb) {
402       return;
403    }
404
405    if (jcr && jcr->cached_attribute) {
406       Dmsg0(400, "Flush last cached attribute.\n");
407       if (!db_create_file_attributes_record(jcr, mdb, jcr->ar)) {
408          Jmsg1(jcr, M_FATAL, 0, _("Attribute create error. %s"), db_strerror(jcr->db));
409       }
410       jcr->cached_attribute = false;
411    }
412
413 #ifdef HAVE_SQLITE
414    if (!mdb->allow_transactions) {
415       return;
416    }
417    db_lock(mdb);
418    if (mdb->transaction) {
419       my_sqlite_query(mdb, "COMMIT"); /* end transaction */
420       mdb->transaction = 0;
421       Dmsg1(400, "End SQLite transaction changes=%d\n", mdb->changes);
422    }
423    mdb->changes = 0;
424    db_unlock(mdb);
425 #endif
426
427 #ifdef HAVE_POSTGRESQL
428    if (!mdb->allow_transactions) {
429       return;
430    }
431    db_lock(mdb);
432    if (mdb->transaction) {
433       db_sql_query(mdb, "COMMIT", NULL, NULL); /* end transaction */
434       mdb->transaction = 0;
435       Dmsg1(400, "End PostgreSQL transaction changes=%d\n", mdb->changes);
436    }
437    mdb->changes = 0;
438    db_unlock(mdb);
439 #endif
440 }
441
442 /*
443  * Given a full filename, split it into its path
444  *  and filename parts. They are returned in pool memory
445  *  in the mdb structure.
446  */
447 void split_path_and_file(JCR *jcr, B_DB *mdb, const char *fname)
448 {
449    const char *p, *f;
450
451    /* Find path without the filename.
452     * I.e. everything after the last / is a "filename".
453     * OK, maybe it is a directory name, but we treat it like
454     * a filename. If we don't find a / then the whole name
455     * must be a path name (e.g. c:).
456     */
457    for (p=f=fname; *p; p++) {
458       if (IsPathSeparator(*p)) {
459          f = p;                       /* set pos of last slash */
460       }
461    }
462    if (IsPathSeparator(*f)) {                   /* did we find a slash? */
463       f++;                            /* yes, point to filename */
464    } else {                           /* no, whole thing must be path name */
465       f = p;
466    }
467
468    /* If filename doesn't exist (i.e. root directory), we
469     * simply create a blank name consisting of a single
470     * space. This makes handling zero length filenames
471     * easier.
472     */
473    mdb->fnl = p - f;
474    if (mdb->fnl > 0) {
475       mdb->fname = check_pool_memory_size(mdb->fname, mdb->fnl+1);
476       memcpy(mdb->fname, f, mdb->fnl);    /* copy filename */
477       mdb->fname[mdb->fnl] = 0;
478    } else {
479       mdb->fname[0] = 0;
480       mdb->fnl = 0;
481    }
482
483    mdb->pnl = f - fname;
484    if (mdb->pnl > 0) {
485       mdb->path = check_pool_memory_size(mdb->path, mdb->pnl+1);
486       memcpy(mdb->path, fname, mdb->pnl);
487       mdb->path[mdb->pnl] = 0;
488    } else {
489       Mmsg1(&mdb->errmsg, _("Path length is zero. File=%s\n"), fname);
490       Jmsg(jcr, M_ERROR, 0, "%s", mdb->errmsg);
491       mdb->path[0] = 0;
492       mdb->pnl = 0;
493    }
494
495    Dmsg2(500, "split path=%s file=%s\n", mdb->path, mdb->fname);
496 }
497
498 /*
499  * List dashes as part of header for listing SQL results in a table
500  */
501 void
502 list_dashes(B_DB *mdb, DB_LIST_HANDLER *send, void *ctx)
503 {
504    SQL_FIELD  *field;
505    int i, j;
506
507    sql_field_seek(mdb, 0);
508    send(ctx, "+");
509    for (i = 0; i < sql_num_fields(mdb); i++) {
510       field = sql_fetch_field(mdb);
511       if (!field) {
512          break;
513       }
514       for (j = 0; j < (int)field->max_length + 2; j++) {
515          send(ctx, "-");
516       }
517       send(ctx, "+");
518    }
519    send(ctx, "\n");
520 }
521
522 /*
523  * If full_list is set, we list vertically, otherwise, we
524  * list on one line horizontally.
525  */
526 void
527 list_result(JCR *jcr, B_DB *mdb, DB_LIST_HANDLER *send, void *ctx, e_list_type type)
528 {
529    SQL_FIELD *field;
530    SQL_ROW row;
531    int i, col_len, max_len = 0;
532    char buf[2000], ewc[30];
533
534    Dmsg0(800, "list_result starts\n");
535    if (mdb->result == NULL || sql_num_rows(mdb) == 0) {
536       send(ctx, _("No results to list.\n"));
537       return;
538    }
539
540    Dmsg1(800, "list_result starts looking at %d fields\n", sql_num_fields(mdb));
541    /* determine column display widths */
542    sql_field_seek(mdb, 0);
543    for (i = 0; i < sql_num_fields(mdb); i++) {
544       Dmsg1(800, "list_result processing field %d\n", i);
545       field = sql_fetch_field(mdb);
546       if (!field) {
547          break;
548       }
549       col_len = cstrlen(field->name);
550       if (type == VERT_LIST) {
551          if (col_len > max_len) {
552             max_len = col_len;
553          }
554       } else {
555          if (IS_NUM(field->type) && (int)field->max_length > 0) { /* fixup for commas */
556             field->max_length += (field->max_length - 1) / 3;
557          }
558          if (col_len < (int)field->max_length) {
559             col_len = field->max_length;
560          }
561          if (col_len < 4 && !IS_NOT_NULL(field->flags)) {
562             col_len = 4;                 /* 4 = length of the word "NULL" */
563          }
564          field->max_length = col_len;    /* reset column info */
565       }
566    }
567
568    Dmsg0(800, "list_result finished first loop\n");
569    if (type == VERT_LIST) {
570       goto vertical_list;
571    }
572
573    Dmsg1(800, "list_result starts second loop looking at %d fields\n", sql_num_fields(mdb));
574    list_dashes(mdb, send, ctx);
575    send(ctx, "|");
576    sql_field_seek(mdb, 0);
577    for (i = 0; i < sql_num_fields(mdb); i++) {
578       Dmsg1(800, "list_result looking at field %d\n", i);
579       field = sql_fetch_field(mdb);
580       if (!field) {
581          break;
582       }
583       bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, field->name);
584       send(ctx, buf);
585    }
586    send(ctx, "\n");
587    list_dashes(mdb, send, ctx);
588
589    Dmsg1(800, "list_result starts third loop looking at %d fields\n", sql_num_fields(mdb));
590    while ((row = sql_fetch_row(mdb)) != NULL) {
591       sql_field_seek(mdb, 0);
592       send(ctx, "|");
593       for (i = 0; i < sql_num_fields(mdb); i++) {
594          field = sql_fetch_field(mdb);
595          if (!field) {
596             break;
597          }
598          if (row[i] == NULL) {
599             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, "NULL");
600          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
601             bsnprintf(buf, sizeof(buf), " %*s |", (int)field->max_length,
602                       add_commas(row[i], ewc));
603          } else {
604             bsnprintf(buf, sizeof(buf), " %-*s |", (int)field->max_length, row[i]);
605          }
606          send(ctx, buf);
607       }
608       send(ctx, "\n");
609    }
610    list_dashes(mdb, send, ctx);
611    return;
612
613 vertical_list:
614
615    Dmsg1(800, "list_result starts vertical list at %d fields\n", sql_num_fields(mdb));
616    while ((row = sql_fetch_row(mdb)) != NULL) {
617       sql_field_seek(mdb, 0);
618       for (i = 0; i < sql_num_fields(mdb); i++) {
619          field = sql_fetch_field(mdb);
620          if (!field) {
621             break;
622          }
623          if (row[i] == NULL) {
624             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, "NULL");
625          } else if (IS_NUM(field->type) && !jcr->gui && is_an_integer(row[i])) {
626             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name,
627                 add_commas(row[i], ewc));
628          } else {
629             bsnprintf(buf, sizeof(buf), " %*s: %s\n", max_len, field->name, row[i]);
630          }
631          send(ctx, buf);
632       }
633       send(ctx, "\n");
634    }
635    return;
636 }
637
638 /* 
639  * Open a new connexion to mdb catalog. This function is used
640  * by batch and accurate mode.
641  */
642 bool db_open_batch_connexion(JCR *jcr, B_DB *mdb)
643 {
644    int multi_db=false;
645
646 #ifdef HAVE_BATCH_FILE_INSERT
647    multi_db=true;               /* we force a new connexion only if batch insert is enabled */
648 #endif
649
650    if (!jcr->db_batch) {
651       jcr->db_batch = db_init_database(jcr, 
652                                       mdb->db_name, 
653                                       mdb->db_user,
654                                       mdb->db_password, 
655                                       mdb->db_address,
656                                       mdb->db_port,
657                                       mdb->db_socket,
658                                       multi_db /* multi_db = true when using batch mode */);
659       if (!jcr->db_batch) {
660          Jmsg0(jcr, M_FATAL, 0, "Could not init batch connexion");
661          return false;
662       }
663
664       if (!db_open_database(jcr, jcr->db_batch)) {
665          Mmsg2(&jcr->db_batch->errmsg,  _("Could not open database \"%s\": ERR=%s\n"),
666               jcr->db_batch->db_name, db_strerror(jcr->db_batch));
667          Jmsg1(jcr, M_FATAL, 0, "%s", jcr->db_batch->errmsg);
668          return false;
669       }      
670       Dmsg3(100, "initdb ref=%d connected=%d db=%p\n", jcr->db_batch->ref_count,
671             jcr->db_batch->connected, jcr->db_batch->db);
672
673    }
674    return true;
675 }
676
677 #endif /* HAVE_SQLITE3 || HAVE_MYSQL || HAVE_SQLITE || HAVE_POSTGRESQL*/