]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
added VSS toggling by enable_vss
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Copyright (C) 2003-2005 Kern Sibbald
12
13    This program is free software; you can redistribute it and/or
14    modify it under the terms of the GNU General Public License
15    version 2 as ammended with additional clauses defined in the
16    file LICENSE in the main source directory.
17
18    This program is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
21    the file LICENSE for additional details.
22
23  */
24
25
26 /* The following is necessary so that we do not include
27  * the dummy external definition of DB.
28  */
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats.h"
33
34 #ifdef HAVE_POSTGRESQL
35
36 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
37
38 /* -----------------------------------------------------------------------
39  *
40  *   PostgreSQL dependent defines and subroutines
41  *
42  * -----------------------------------------------------------------------
43  */
44
45 /* List of open databases */
46 static BQUEUE db_list = {&db_list, &db_list};
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49
50 /*
51  * Initialize database data structure. In principal this should
52  * never have errors, or it is really fatal.
53  */
54 B_DB *
55 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
56                  const char *db_address, int db_port, const char *db_socket,
57                  int mult_db_connections)
58 {
59    B_DB *mdb;
60
61    if (!db_user) {
62       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
63       return NULL;
64    }
65    P(mutex);                          /* lock DB queue */
66    if (!mult_db_connections) {
67       /* Look to see if DB already open */
68       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
69          if (strcmp(mdb->db_name, db_name) == 0) {
70             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
71             mdb->ref_count++;
72             V(mutex);
73             return mdb;                  /* already open */
74          }
75       }
76    }
77    Dmsg0(100, "db_open first time\n");
78    mdb = (B_DB *)malloc(sizeof(B_DB));
79    memset(mdb, 0, sizeof(B_DB));
80    mdb->db_name = bstrdup(db_name);
81    mdb->db_user = bstrdup(db_user);
82    if (db_password) {
83       mdb->db_password = bstrdup(db_password);
84    }
85    if (db_address) {
86       mdb->db_address  = bstrdup(db_address);
87    }
88    if (db_socket) {
89       mdb->db_socket   = bstrdup(db_socket);
90    }
91    mdb->db_port        = db_port;
92    mdb->have_insert_id = TRUE;
93    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
94    *mdb->errmsg        = 0;
95    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
96    mdb->cached_path    = get_pool_memory(PM_FNAME);
97    mdb->cached_path_id = 0;
98    mdb->ref_count      = 1;
99    mdb->fname          = get_pool_memory(PM_FNAME);
100    mdb->path           = get_pool_memory(PM_FNAME);
101    mdb->esc_name       = get_pool_memory(PM_FNAME);
102    mdb->allow_transactions = mult_db_connections;
103    qinsert(&db_list, &mdb->bq);            /* put db in list */
104    V(mutex);
105    return mdb;
106 }
107
108 /*
109  * Now actually open the database.  This can generate errors,
110  *   which are returned in the errmsg
111  *
112  * DO NOT close the database or free(mdb) here !!!!
113  */
114 int
115 db_open_database(JCR *jcr, B_DB *mdb)
116 {
117    int errstat;
118    char buf[10], *port;
119
120    P(mutex);
121    if (mdb->connected) {
122       V(mutex);
123       return 1;
124    }
125    mdb->connected = false;
126
127    if ((errstat=rwl_init(&mdb->lock)) != 0) {
128       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
129             strerror(errstat));
130       V(mutex);
131       return 0;
132    }
133
134    if (mdb->db_port) {
135       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
136       port = buf;
137    } else {
138       port = NULL;
139    }
140
141    /* If connection fails, try at 5 sec intervals for 30 seconds. */
142    for (int retry=0; retry < 6; retry++) {
143       /* connect to the database */
144       mdb->db = PQsetdbLogin(
145            mdb->db_address,           /* default = localhost */
146            port,                      /* default port */
147            NULL,                      /* pg options */
148            NULL,                      /* tty, ignored */
149            mdb->db_name,              /* database name */
150            mdb->db_user,              /* login name */
151            mdb->db_password);         /* password */
152
153       /* If no connect, try once more in case it is a timing problem */
154       if (PQstatus(mdb->db) == CONNECTION_OK) {
155          break;
156       }
157       bmicrosleep(5, 0);
158    }
159
160    Dmsg0(50, "pg_real_connect done\n");
161    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
162             mdb->db_password==NULL?"(NULL)":mdb->db_password);
163
164    if (PQstatus(mdb->db) != CONNECTION_OK) {
165       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
166             "Database=%s User=%s\n"
167             "It is probably not running or your password is incorrect.\n"),
168              mdb->db_name, mdb->db_user);
169       V(mutex);
170       return 0;
171    }
172
173    if (!check_tables_version(jcr, mdb)) {
174       V(mutex);
175       return 0;
176    }
177
178    mdb->connected = true;
179    V(mutex);
180    return 1;
181 }
182
183 void
184 db_close_database(JCR *jcr, B_DB *mdb)
185 {
186    if (!mdb) {
187       return;
188    }
189    db_end_transaction(jcr, mdb);
190    P(mutex);
191    mdb->ref_count--;
192    if (mdb->ref_count == 0) {
193       qdchain(&mdb->bq);
194       if (mdb->connected && mdb->db) {
195          sql_close(mdb);
196       }
197       rwl_destroy(&mdb->lock);
198       free_pool_memory(mdb->errmsg);
199       free_pool_memory(mdb->cmd);
200       free_pool_memory(mdb->cached_path);
201       free_pool_memory(mdb->fname);
202       free_pool_memory(mdb->path);
203       free_pool_memory(mdb->esc_name);
204       if (mdb->db_name) {
205          free(mdb->db_name);
206       }
207       if (mdb->db_user) {
208          free(mdb->db_user);
209       }
210       if (mdb->db_password) {
211          free(mdb->db_password);
212       }
213       if (mdb->db_address) {
214          free(mdb->db_address);
215       }
216       if (mdb->db_socket) {
217          free(mdb->db_socket);
218       }
219       my_postgresql_free_result(mdb);
220       free(mdb);
221    }
222    V(mutex);
223 }
224
225 /*
226  * Return the next unique index (auto-increment) for
227  * the given table.  Return NULL on error.
228  *
229  * For PostgreSQL, NULL causes the auto-increment value
230  *  to be updated.
231  */
232 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
233 {
234    strcpy(index, "NULL");
235    return 1;
236 }
237
238
239 /*
240  * Escape strings so that PostgreSQL is happy
241  *
242  *   NOTE! len is the length of the old string. Your new
243  *         string must be long enough (max 2*old+1) to hold
244  *         the escaped output.
245  */
246 void
247 db_escape_string(char *snew, char *old, int len)
248 {
249    PQescapeString(snew, old, len);
250 }
251
252 /*
253  * Submit a general SQL command (cmd), and for each row returned,
254  *  the sqlite_handler is called with the ctx.
255  */
256 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
257 {
258    SQL_ROW row;
259
260    Dmsg0(500, "db_sql_query started\n");
261
262    db_lock(mdb);
263    if (sql_query(mdb, query) != 0) {
264       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
265       db_unlock(mdb);
266       Dmsg0(500, "db_sql_query failed\n");
267       return 0;
268    }
269    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
270
271    if (result_handler != NULL) {
272       Dmsg0(500, "db_sql_query invoking handler\n");
273       if ((mdb->result = sql_store_result(mdb)) != NULL) {
274          int num_fields = sql_num_fields(mdb);
275
276          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
277          while ((row = sql_fetch_row(mdb)) != NULL) {
278
279             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
280             if (result_handler(ctx, num_fields, row))
281                break;
282          }
283
284         sql_free_result(mdb);
285       }
286    }
287    db_unlock(mdb);
288
289    Dmsg0(500, "db_sql_query finished\n");
290
291    return 1;
292 }
293
294
295
296 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
297 {
298    int j;
299    POSTGRESQL_ROW row = NULL; // by default, return NULL
300
301    Dmsg0(500, "my_postgresql_fetch_row start\n");
302
303    if (mdb->row_number == -1 || mdb->row == NULL) {
304       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
305
306       if (mdb->row != NULL) {
307          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
308          free(mdb->row);
309          mdb->row = NULL;
310       }
311
312       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
313
314       // now reset the row_number now that we have the space allocated
315       mdb->row_number = 0;
316    }
317
318    // if still within the result set
319    if (mdb->row_number < mdb->num_rows) {
320       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
321       // get each value from this row
322       for (j = 0; j < mdb->num_fields; j++) {
323          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
324          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
325       }
326       // increment the row number for the next call
327       mdb->row_number++;
328
329       row = mdb->row;
330    } else {
331       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
332    }
333
334    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
335
336    return row;
337 }
338
339 int my_postgresql_max_length(B_DB *mdb, int field_num) {
340    //
341    // for a given column, find the max length
342    //
343    int max_length;
344    int i;
345    int this_length;
346
347    max_length = 0;
348    for (i = 0; i < mdb->num_rows; i++) {
349       if (PQgetisnull(mdb->result, i, field_num)) {
350           this_length = 4;        // "NULL"
351       } else {
352           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
353       }
354
355       if (max_length < this_length) {
356           max_length = this_length;
357       }
358    }
359
360    return max_length;
361 }
362
363 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
364 {
365    int     i;
366
367    Dmsg0(500, "my_postgresql_fetch_field starts\n");
368    if (mdb->fields == NULL) {
369       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
370       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
371
372       for (i = 0; i < mdb->num_fields; i++) {
373          Dmsg1(500, "filling field %d\n", i);
374          mdb->fields[i].name           = PQfname(mdb->result, i);
375          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
376          mdb->fields[i].type       = PQftype(mdb->result, i);
377          mdb->fields[i].flags      = 0;
378
379          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
380             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
381             mdb->fields[i].flags);
382       } // end for
383    } // end if
384
385    // increment field number for the next time around
386
387    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
388    return &mdb->fields[mdb->field_number++];
389 }
390
391 void my_postgresql_data_seek(B_DB *mdb, int row)
392 {
393    // set the row number to be returned on the next call
394    // to my_postgresql_fetch_row
395    mdb->row_number = row;
396 }
397
398 void my_postgresql_field_seek(B_DB *mdb, int field)
399 {
400    mdb->field_number = field;
401 }
402
403 /*
404  * Note, if this routine returns 1 (failure), Bacula expects
405  *  that no result has been stored.
406  */
407 int my_postgresql_query(B_DB *mdb, const char *query) {
408    Dmsg0(500, "my_postgresql_query started\n");
409    // We are starting a new query.  reset everything.
410    mdb->num_rows     = -1;
411    mdb->row_number   = -1;
412    mdb->field_number = -1;
413
414    if (mdb->result != NULL) {
415       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
416    }
417
418    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
419    mdb->result = PQexec(mdb->db, query);
420    mdb->status = PQresultStatus(mdb->result);
421    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
422       Dmsg1(500, "we have a result\n", query);
423
424       // how many fields in the set?
425       mdb->num_fields = (int) PQnfields(mdb->result);
426       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
427
428       mdb->num_rows   = PQntuples(mdb->result);
429       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
430
431       mdb->status = 0;
432    } else {
433       Dmsg1(500, "we failed\n", query);
434       mdb->status = 1;
435    }
436
437    Dmsg0(500, "my_postgresql_query finishing\n");
438
439    return mdb->status;
440 }
441
442 void my_postgresql_free_result (B_DB *mdb)
443 {
444    if (mdb->result) {
445       PQclear(mdb->result);
446       mdb->result = NULL;
447    }
448
449    if (mdb->row) {
450       free(mdb->row);
451       mdb->row = NULL;
452    }
453
454    if (mdb->fields) {
455       free(mdb->fields);
456       mdb->fields = NULL;
457    }
458 }
459
460 int my_postgresql_currval(B_DB *mdb, char *table_name)
461 {
462    // Obtain the current value of the sequence that
463    // provides the serial value for primary key of the table.
464
465    // currval is local to our session.  It is not affected by
466    // other transactions.
467
468    // Determine the name of the sequence.
469    // PostgreSQL automatically creates a sequence using
470    // <table>_<column>_seq.
471    // At the time of writing, all tables used this format for
472    // for their primary key: <table>id
473    // Except for basefiles which has a primary key on baseid.
474    // Therefore, we need to special case that one table.
475
476    // everything else can use the PostgreSQL formula.
477
478    char      sequence[NAMEDATALEN-1];
479    char      query   [NAMEDATALEN+50];
480    PGresult *result;
481    int       id = 0;
482
483    if (strcasecmp(table_name, "basefiles") == 0) {
484       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
485    } else {
486       bstrncpy(sequence, table_name, sizeof(sequence));
487       bstrncat(sequence, "_",        sizeof(sequence));
488       bstrncat(sequence, table_name, sizeof(sequence));
489       bstrncat(sequence, "id",       sizeof(sequence));
490    }
491
492    bstrncat(sequence, "_seq", sizeof(sequence));
493    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
494
495 // Mmsg(query, "SELECT currval('%s')", sequence);
496    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
497    result = PQexec(mdb->db, query);
498
499    Dmsg0(500, "exec done");
500
501    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
502       Dmsg0(500, "getting value");
503       id = atoi(PQgetvalue(result, 0, 0));
504       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
505    } else {
506       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
507    }
508
509    PQclear(result);
510
511    return id;
512 }
513
514
515 #endif /* HAVE_POSTGRESQL */