]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/cats/postgresql.c
kes Apply patch supplied in bug #656 to pass priority field
[bacula/bacula] / bacula / src / cats / postgresql.c
1 /*
2  * Bacula Catalog Database routines specific to PostgreSQL
3  *   These are PostgreSQL specific routines
4  *
5  *    Dan Langille, December 2003
6  *    based upon work done by Kern Sibbald, March 2000
7  *
8  *    Version $Id$
9  */
10 /*
11    Copyright (C) 2003-2006 Kern Sibbald
12
13    This program is free software; you can redistribute it and/or
14    modify it under the terms of the GNU General Public License
15    version 2 as amended with additional clauses defined in the
16    file LICENSE in the main source directory.
17
18    This program is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
21    the file LICENSE for additional details.
22
23  */
24
25
26 /* The following is necessary so that we do not include
27  * the dummy external definition of DB.
28  */
29 #define __SQL_C                       /* indicate that this is sql.c */
30
31 #include "bacula.h"
32 #include "cats.h"
33
34 #ifdef HAVE_POSTGRESQL
35
36 #include "postgres_ext.h"       /* needed for NAMEDATALEN */
37
38 /* -----------------------------------------------------------------------
39  *
40  *   PostgreSQL dependent defines and subroutines
41  *
42  * -----------------------------------------------------------------------
43  */
44
45 /* List of open databases */
46 static BQUEUE db_list = {&db_list, &db_list};
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49
50 /*
51  * Retrieve database type
52  */
53 const char *
54 db_get_type(void)
55 {
56    return "PostgreSQL";
57
58 }
59
60 /*
61  * Initialize database data structure. In principal this should
62  * never have errors, or it is really fatal.
63  */
64 B_DB *
65 db_init_database(JCR *jcr, const char *db_name, const char *db_user, const char *db_password,
66                  const char *db_address, int db_port, const char *db_socket,
67                  int mult_db_connections)
68 {
69    B_DB *mdb;
70
71    if (!db_user) {
72       Jmsg(jcr, M_FATAL, 0, _("A user name for PostgreSQL must be supplied.\n"));
73       return NULL;
74    }
75    P(mutex);                          /* lock DB queue */
76    if (!mult_db_connections) {
77       /* Look to see if DB already open */
78       for (mdb=NULL; (mdb=(B_DB *)qnext(&db_list, &mdb->bq)); ) {
79          if (bstrcmp(mdb->db_name, db_name) &&
80              bstrcmp(mdb->db_address, db_address) &&
81              mdb->db_port == db_port) {
82             Dmsg2(100, "DB REopen %d %s\n", mdb->ref_count, db_name);
83             mdb->ref_count++;
84             V(mutex);
85             return mdb;                  /* already open */
86          }
87       }
88    }
89    Dmsg0(100, "db_open first time\n");
90    mdb = (B_DB *)malloc(sizeof(B_DB));
91    memset(mdb, 0, sizeof(B_DB));
92    mdb->db_name = bstrdup(db_name);
93    mdb->db_user = bstrdup(db_user);
94    if (db_password) {
95       mdb->db_password = bstrdup(db_password);
96    }
97    if (db_address) {
98       mdb->db_address  = bstrdup(db_address);
99    }
100    if (db_socket) {
101       mdb->db_socket   = bstrdup(db_socket);
102    }
103    mdb->db_port        = db_port;
104    mdb->have_insert_id = TRUE;
105    mdb->errmsg         = get_pool_memory(PM_EMSG); /* get error message buffer */
106    *mdb->errmsg        = 0;
107    mdb->cmd            = get_pool_memory(PM_EMSG); /* get command buffer */
108    mdb->cached_path    = get_pool_memory(PM_FNAME);
109    mdb->cached_path_id = 0;
110    mdb->ref_count      = 1;
111    mdb->fname          = get_pool_memory(PM_FNAME);
112    mdb->path           = get_pool_memory(PM_FNAME);
113    mdb->esc_name       = get_pool_memory(PM_FNAME);
114    mdb->allow_transactions = mult_db_connections;
115    qinsert(&db_list, &mdb->bq);            /* put db in list */
116    V(mutex);
117    return mdb;
118 }
119
120 /*
121  * Now actually open the database.  This can generate errors,
122  *   which are returned in the errmsg
123  *
124  * DO NOT close the database or free(mdb) here !!!!
125  */
126 int
127 db_open_database(JCR *jcr, B_DB *mdb)
128 {
129    int errstat;
130    char buf[10], *port;
131
132    P(mutex);
133    if (mdb->connected) {
134       V(mutex);
135       return 1;
136    }
137    mdb->connected = false;
138
139    if ((errstat=rwl_init(&mdb->lock)) != 0) {
140       Mmsg1(&mdb->errmsg, _("Unable to initialize DB lock. ERR=%s\n"),
141             strerror(errstat));
142       V(mutex);
143       return 0;
144    }
145
146    if (mdb->db_port) {
147       bsnprintf(buf, sizeof(buf), "%d", mdb->db_port);
148       port = buf;
149    } else {
150       port = NULL;
151    }
152
153    /* If connection fails, try at 5 sec intervals for 30 seconds. */
154    for (int retry=0; retry < 6; retry++) {
155       /* connect to the database */
156       mdb->db = PQsetdbLogin(
157            mdb->db_address,           /* default = localhost */
158            port,                      /* default port */
159            NULL,                      /* pg options */
160            NULL,                      /* tty, ignored */
161            mdb->db_name,              /* database name */
162            mdb->db_user,              /* login name */
163            mdb->db_password);         /* password */
164
165       /* If no connect, try once more in case it is a timing problem */
166       if (PQstatus(mdb->db) == CONNECTION_OK) {
167          break;
168       }
169       bmicrosleep(5, 0);
170    }
171
172    Dmsg0(50, "pg_real_connect done\n");
173    Dmsg3(50, "db_user=%s db_name=%s db_password=%s\n", mdb->db_user, mdb->db_name,
174             mdb->db_password==NULL?"(NULL)":mdb->db_password);
175
176    if (PQstatus(mdb->db) != CONNECTION_OK) {
177       Mmsg2(&mdb->errmsg, _("Unable to connect to PostgreSQL server.\n"
178             "Database=%s User=%s\n"
179             "It is probably not running or your password is incorrect.\n"),
180              mdb->db_name, mdb->db_user);
181       V(mutex);
182       return 0;
183    }
184
185    if (!check_tables_version(jcr, mdb)) {
186       V(mutex);
187       return 0;
188    }
189
190    sql_query(mdb, "SET datestyle TO 'ISO, YMD'");
191
192    mdb->connected = true;
193    V(mutex);
194    return 1;
195 }
196
197 void
198 db_close_database(JCR *jcr, B_DB *mdb)
199 {
200    if (!mdb) {
201       return;
202    }
203    db_end_transaction(jcr, mdb);
204    P(mutex);
205    mdb->ref_count--;
206    if (mdb->ref_count == 0) {
207       qdchain(&mdb->bq);
208       if (mdb->connected && mdb->db) {
209          sql_close(mdb);
210       }
211       rwl_destroy(&mdb->lock);
212       free_pool_memory(mdb->errmsg);
213       free_pool_memory(mdb->cmd);
214       free_pool_memory(mdb->cached_path);
215       free_pool_memory(mdb->fname);
216       free_pool_memory(mdb->path);
217       free_pool_memory(mdb->esc_name);
218       if (mdb->db_name) {
219          free(mdb->db_name);
220       }
221       if (mdb->db_user) {
222          free(mdb->db_user);
223       }
224       if (mdb->db_password) {
225          free(mdb->db_password);
226       }
227       if (mdb->db_address) {
228          free(mdb->db_address);
229       }
230       if (mdb->db_socket) {
231          free(mdb->db_socket);
232       }
233       my_postgresql_free_result(mdb);
234       free(mdb);
235    }
236    V(mutex);
237 }
238
239 /*
240  * Return the next unique index (auto-increment) for
241  * the given table.  Return NULL on error.
242  *
243  * For PostgreSQL, NULL causes the auto-increment value
244  *  to be updated.
245  */
246 int db_next_index(JCR *jcr, B_DB *mdb, char *table, char *index)
247 {
248    strcpy(index, "NULL");
249    return 1;
250 }
251
252
253 /*
254  * Escape strings so that PostgreSQL is happy
255  *
256  *   NOTE! len is the length of the old string. Your new
257  *         string must be long enough (max 2*old+1) to hold
258  *         the escaped output.
259  */
260 void
261 db_escape_string(char *snew, char *old, int len)
262 {
263    PQescapeString(snew, old, len);
264 }
265
266 /*
267  * Submit a general SQL command (cmd), and for each row returned,
268  *  the sqlite_handler is called with the ctx.
269  */
270 int db_sql_query(B_DB *mdb, const char *query, DB_RESULT_HANDLER *result_handler, void *ctx)
271 {
272    SQL_ROW row;
273
274    Dmsg0(500, "db_sql_query started\n");
275
276    db_lock(mdb);
277    if (sql_query(mdb, query) != 0) {
278       Mmsg(mdb->errmsg, _("Query failed: %s: ERR=%s\n"), query, sql_strerror(mdb));
279       db_unlock(mdb);
280       Dmsg0(500, "db_sql_query failed\n");
281       return 0;
282    }
283    Dmsg0(500, "db_sql_query succeeded. checking handler\n");
284
285    if (result_handler != NULL) {
286       Dmsg0(500, "db_sql_query invoking handler\n");
287       if ((mdb->result = sql_store_result(mdb)) != NULL) {
288          int num_fields = sql_num_fields(mdb);
289
290          Dmsg0(500, "db_sql_query sql_store_result suceeded\n");
291          while ((row = sql_fetch_row(mdb)) != NULL) {
292
293             Dmsg0(500, "db_sql_query sql_fetch_row worked\n");
294             if (result_handler(ctx, num_fields, row))
295                break;
296          }
297
298         sql_free_result(mdb);
299       }
300    }
301    db_unlock(mdb);
302
303    Dmsg0(500, "db_sql_query finished\n");
304
305    return 1;
306 }
307
308
309
310 POSTGRESQL_ROW my_postgresql_fetch_row(B_DB *mdb)
311 {
312    int j;
313    POSTGRESQL_ROW row = NULL; // by default, return NULL
314
315    Dmsg0(500, "my_postgresql_fetch_row start\n");
316
317    if (mdb->row_number == -1 || mdb->row == NULL) {
318       Dmsg1(500, "we have need space of %d bytes\n", sizeof(char *) * mdb->num_fields);
319
320       if (mdb->row != NULL) {
321          Dmsg0(500, "my_postgresql_fetch_row freeing space\n");
322          free(mdb->row);
323          mdb->row = NULL;
324       }
325
326       mdb->row = (POSTGRESQL_ROW) malloc(sizeof(char *) * mdb->num_fields);
327
328       // now reset the row_number now that we have the space allocated
329       mdb->row_number = 0;
330    }
331
332    // if still within the result set
333    if (mdb->row_number < mdb->num_rows) {
334       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
335       // get each value from this row
336       for (j = 0; j < mdb->num_fields; j++) {
337          mdb->row[j] = PQgetvalue(mdb->result, mdb->row_number, j);
338          Dmsg2(500, "my_postgresql_fetch_row field '%d' has value '%s'\n", j, mdb->row[j]);
339       }
340       // increment the row number for the next call
341       mdb->row_number++;
342
343       row = mdb->row;
344    } else {
345       Dmsg2(500, "my_postgresql_fetch_row row number '%d' is NOT acceptable (0..%d)\n", mdb->row_number, mdb->num_rows);
346    }
347
348    Dmsg1(500, "my_postgresql_fetch_row finishes returning %x\n", row);
349
350    return row;
351 }
352
353 int my_postgresql_max_length(B_DB *mdb, int field_num) {
354    //
355    // for a given column, find the max length
356    //
357    int max_length;
358    int i;
359    int this_length;
360
361    max_length = 0;
362    for (i = 0; i < mdb->num_rows; i++) {
363       if (PQgetisnull(mdb->result, i, field_num)) {
364           this_length = 4;        // "NULL"
365       } else {
366           this_length = cstrlen(PQgetvalue(mdb->result, i, field_num));
367       }
368
369       if (max_length < this_length) {
370           max_length = this_length;
371       }
372    }
373
374    return max_length;
375 }
376
377 POSTGRESQL_FIELD * my_postgresql_fetch_field(B_DB *mdb)
378 {
379    int     i;
380
381    Dmsg0(500, "my_postgresql_fetch_field starts\n");
382    if (mdb->fields == NULL) {
383       Dmsg1(500, "allocating space for %d fields\n", mdb->num_fields);
384       mdb->fields = (POSTGRESQL_FIELD *)malloc(sizeof(POSTGRESQL_FIELD) * mdb->num_fields);
385
386       for (i = 0; i < mdb->num_fields; i++) {
387          Dmsg1(500, "filling field %d\n", i);
388          mdb->fields[i].name           = PQfname(mdb->result, i);
389          mdb->fields[i].max_length = my_postgresql_max_length(mdb, i);
390          mdb->fields[i].type       = PQftype(mdb->result, i);
391          mdb->fields[i].flags      = 0;
392
393          Dmsg4(500, "my_postgresql_fetch_field finds field '%s' has length='%d' type='%d' and IsNull=%d\n",
394             mdb->fields[i].name, mdb->fields[i].max_length, mdb->fields[i].type,
395             mdb->fields[i].flags);
396       } // end for
397    } // end if
398
399    // increment field number for the next time around
400
401    Dmsg0(500, "my_postgresql_fetch_field finishes\n");
402    return &mdb->fields[mdb->field_number++];
403 }
404
405 void my_postgresql_data_seek(B_DB *mdb, int row)
406 {
407    // set the row number to be returned on the next call
408    // to my_postgresql_fetch_row
409    mdb->row_number = row;
410 }
411
412 void my_postgresql_field_seek(B_DB *mdb, int field)
413 {
414    mdb->field_number = field;
415 }
416
417 /*
418  * Note, if this routine returns 1 (failure), Bacula expects
419  *  that no result has been stored.
420  */
421 int my_postgresql_query(B_DB *mdb, const char *query) {
422    Dmsg0(500, "my_postgresql_query started\n");
423    // We are starting a new query.  reset everything.
424    mdb->num_rows     = -1;
425    mdb->row_number   = -1;
426    mdb->field_number = -1;
427
428    if (mdb->result != NULL) {
429       PQclear(mdb->result);  /* hmm, someone forgot to free?? */
430    }
431
432    Dmsg1(500, "my_postgresql_query starts with '%s'\n", query);
433    mdb->result = PQexec(mdb->db, query);
434    mdb->status = PQresultStatus(mdb->result);
435    if (mdb->status == PGRES_TUPLES_OK || mdb->status == PGRES_COMMAND_OK) {
436       Dmsg1(500, "we have a result\n", query);
437
438       // how many fields in the set?
439       mdb->num_fields = (int) PQnfields(mdb->result);
440       Dmsg1(500, "we have %d fields\n", mdb->num_fields);
441
442       mdb->num_rows   = PQntuples(mdb->result);
443       Dmsg1(500, "we have %d rows\n", mdb->num_rows);
444
445       mdb->status = 0;
446    } else {
447       Dmsg1(500, "we failed\n", query);
448       mdb->status = 1;
449    }
450
451    Dmsg0(500, "my_postgresql_query finishing\n");
452
453    return mdb->status;
454 }
455
456 void my_postgresql_free_result (B_DB *mdb)
457 {
458    if (mdb->result) {
459       PQclear(mdb->result);
460       mdb->result = NULL;
461    }
462
463    if (mdb->row) {
464       free(mdb->row);
465       mdb->row = NULL;
466    }
467
468    if (mdb->fields) {
469       free(mdb->fields);
470       mdb->fields = NULL;
471    }
472 }
473
474 int my_postgresql_currval(B_DB *mdb, char *table_name)
475 {
476    // Obtain the current value of the sequence that
477    // provides the serial value for primary key of the table.
478
479    // currval is local to our session.  It is not affected by
480    // other transactions.
481
482    // Determine the name of the sequence.
483    // PostgreSQL automatically creates a sequence using
484    // <table>_<column>_seq.
485    // At the time of writing, all tables used this format for
486    // for their primary key: <table>id
487    // Except for basefiles which has a primary key on baseid.
488    // Therefore, we need to special case that one table.
489
490    // everything else can use the PostgreSQL formula.
491
492    char      sequence[NAMEDATALEN-1];
493    char      query   [NAMEDATALEN+50];
494    PGresult *result;
495    int       id = 0;
496
497    if (strcasecmp(table_name, "basefiles") == 0) {
498       bstrncpy(sequence, "basefiles_baseid", sizeof(sequence));
499    } else {
500       bstrncpy(sequence, table_name, sizeof(sequence));
501       bstrncat(sequence, "_",        sizeof(sequence));
502       bstrncat(sequence, table_name, sizeof(sequence));
503       bstrncat(sequence, "id",       sizeof(sequence));
504    }
505
506    bstrncat(sequence, "_seq", sizeof(sequence));
507    bsnprintf(query, sizeof(query), "SELECT currval('%s')", sequence);
508
509 // Mmsg(query, "SELECT currval('%s')", sequence);
510    Dmsg1(500, "my_postgresql_currval invoked with '%s'\n", query);
511    result = PQexec(mdb->db, query);
512
513    Dmsg0(500, "exec done");
514
515    if (PQresultStatus(result) == PGRES_TUPLES_OK) {
516       Dmsg0(500, "getting value");
517       id = atoi(PQgetvalue(result, 0, 0));
518       Dmsg2(500, "got value '%s' which became %d\n", PQgetvalue(result, 0, 0), id);
519    } else {
520       Mmsg1(&mdb->errmsg, _("error fetching currval: %s\n"), PQerrorMessage(mdb->db));
521    }
522
523    PQclear(result);
524
525    return id;
526 }
527
528
529 #endif /* HAVE_POSTGRESQL */