]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
kes Fix SQL case problem that may cause the failure of DiskToCatalog
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106           _("Storage daemon"), store->address,
107           NULL, store->SDport, verbose);
108    if (sd == NULL) {
109       return false;
110    }
111    sd->res = (RES *)store;        /* save pointer to other end */
112    jcr->store_bsock = sd;
113
114    if (!authenticate_storage_daemon(jcr, store)) {
115       sd->close();
116       jcr->store_bsock = NULL;
117       return false;
118    }
119    return true;
120 }
121
122 /*
123  * Here we ask the SD to send us the info for a 
124  *  particular device resource.
125  */
126 #ifdef needed
127 bool update_device_res(JCR *jcr, DEVICE *dev)
128 {
129    POOL_MEM device_name; 
130    BSOCK *sd;
131    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
132       return false;
133    }
134    sd = jcr->store_bsock;
135    pm_strcpy(device_name, dev->name());
136    bash_spaces(device_name);
137    sd->fsend(query_device, device_name.c_str());
138    Dmsg1(100, ">stored: %s\n", sd->msg);
139    /* The data is returned through Device_update */
140    if (bget_dirmsg(sd) <= 0) {
141       return false;
142    }
143    return true;
144 }
145 #endif
146
147 /*
148  * Start a job with the Storage daemon
149  */
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
151 {
152    bool ok = true;
153    STORE *storage;
154    BSOCK *sd;
155    char auth_key[100];
156    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157    POOL_MEM job_name, client_name, fileset_name;
158    int copy = 0;
159    int stripe = 0;
160    char ed1[30];
161
162    sd = jcr->store_bsock;
163    /*
164     * Now send JobId and permissions, and get back the authorization key.
165     */
166    pm_strcpy(job_name, jcr->job->name());
167    bash_spaces(job_name);
168    pm_strcpy(client_name, jcr->client->name());
169    bash_spaces(client_name);
170    pm_strcpy(fileset_name, jcr->fileset->name());
171    bash_spaces(fileset_name);
172    if (jcr->fileset->MD5[0] == 0) {
173       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
174    }
175    /* If rescheduling, cancel the previous incarnation of this job
176     *  with the SD, which might be waiting on the FD connection.
177     *  If we do not cancel it the SD will not accept a new connection
178     *  for the same jobid.
179     */
180    if (jcr->reschedule_count) {
181       sd->fsend("cancel Job=%s\n", jcr->Job);
182       while (sd->recv() >= 0)
183          { }
184    } 
185    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
186              job_name.c_str(), client_name.c_str(), 
187              jcr->JobType, jcr->JobLevel,
188              fileset_name.c_str(), !jcr->pool->catalog_files,
189              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
190              jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
191    Dmsg1(100, ">stored: %s\n", sd->msg);
192    if (bget_dirmsg(sd) > 0) {
193        Dmsg1(100, "<stored: %s", sd->msg);
194        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
195                   &jcr->VolSessionTime, &auth_key) != 3) {
196           Dmsg1(100, "BadJob=%s\n", sd->msg);
197           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
198           return 0;
199        } else {
200           jcr->sd_auth_key = bstrdup(auth_key);
201           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
202        }
203    } else {
204       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
205          sd->bstrerror());
206       return 0;
207    }
208
209    /*
210     * We have two loops here. The first comes from the 
211     *  Storage = associated with the Job, and we need 
212     *  to attach to each one.
213     * The inner loop loops over all the alternative devices
214     *  associated with each Storage. It selects the first
215     *  available one.
216     *
217     */
218    /* Do read side of storage daemon */
219    if (ok && rstore) {
220       /* For the moment, only migrate has rpool */
221       if (jcr->JobType == JT_MIGRATE) {
222          pm_strcpy(pool_type, jcr->rpool->pool_type);
223          pm_strcpy(pool_name, jcr->rpool->name());
224       } else {
225          pm_strcpy(pool_type, jcr->pool->pool_type);
226          pm_strcpy(pool_name, jcr->pool->name());
227       }
228       bash_spaces(pool_type);
229       bash_spaces(pool_name);
230       foreach_alist(storage, rstore) {
231          Dmsg1(100, "Rstore=%s\n", storage->name());
232          pm_strcpy(store_name, storage->name());
233          bash_spaces(store_name);
234          pm_strcpy(media_type, storage->media_type);
235          bash_spaces(media_type);
236          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
237                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
238          Dmsg1(100, "rstore >stored: %s", sd->msg);
239          DEVICE *dev;
240          /* Loop over alternative storage Devices until one is OK */
241          foreach_alist(dev, storage->device) {
242             pm_strcpy(device_name, dev->name());
243             bash_spaces(device_name);
244             sd->fsend(use_device, device_name.c_str());
245             Dmsg1(100, ">stored: %s", sd->msg);
246          }
247          sd->signal(BNET_EOD);           /* end of Devices */
248       }
249       sd->signal(BNET_EOD);              /* end of Storages */
250       if (bget_dirmsg(sd) > 0) {
251          Dmsg1(100, "<stored: %s", sd->msg);
252          /* ****FIXME**** save actual device name */
253          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
254       } else {
255          ok = false;
256       }
257    }
258
259    /* Do write side of storage daemon */
260    if (ok && wstore) {
261       pm_strcpy(pool_type, jcr->pool->pool_type);
262       pm_strcpy(pool_name, jcr->pool->name());
263       bash_spaces(pool_type);
264       bash_spaces(pool_name);
265       foreach_alist(storage, wstore) {
266          pm_strcpy(store_name, storage->name());
267          bash_spaces(store_name);
268          pm_strcpy(media_type, storage->media_type);
269          bash_spaces(media_type);
270          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
271                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
272
273          Dmsg1(100, "wstore >stored: %s", sd->msg);
274          DEVICE *dev;
275          /* Loop over alternative storage Devices until one is OK */
276          foreach_alist(dev, storage->device) {
277             pm_strcpy(device_name, dev->name());
278             bash_spaces(device_name);
279             sd->fsend(use_device, device_name.c_str());
280             Dmsg1(100, ">stored: %s", sd->msg);
281          }
282          sd->signal(BNET_EOD);           /* end of Devices */
283       }
284       sd->signal(BNET_EOD);              /* end of Storages */
285       if (bget_dirmsg(sd) > 0) {
286          Dmsg1(100, "<stored: %s", sd->msg);
287          /* ****FIXME**** save actual device name */
288          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
289       } else {
290          ok = false;
291       }
292    }
293    if (!ok) {
294       POOL_MEM err_msg;
295       if (sd->msg[0]) {
296          pm_strcpy(err_msg, sd->msg); /* save message */
297          Jmsg(jcr, M_FATAL, 0, _("\n"
298               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
299               device_name.c_str(), err_msg.c_str()/* sd->msg */);
300       } else { 
301          Jmsg(jcr, M_FATAL, 0, _("\n"
302               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
303               device_name.c_str());
304       }
305    } else {
306       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
307    }
308    return ok;
309 }
310
311 /*
312  * Start a thread to handle Storage daemon messages and
313  *  Catalog requests.
314  */
315 bool start_storage_daemon_message_thread(JCR *jcr)
316 {
317    int status;
318    pthread_t thid;
319
320    jcr->inc_use_count();              /* mark in use by msg thread */
321    jcr->sd_msg_thread_done = false;
322    jcr->SD_msg_chan = 0;
323    Dmsg0(100, "Start SD msg_thread.\n");
324    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
325       berrno be;
326       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
327    }
328    /* Wait for thread to start */
329    while (jcr->SD_msg_chan == 0) {
330       bmicrosleep(0, 50);
331       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
332          return false;
333       }
334    }
335    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
336    return true;
337 }
338
339 extern "C" void msg_thread_cleanup(void *arg)
340 {
341    JCR *jcr = (JCR *)arg;
342    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
343    jcr->sd_msg_thread_done = true;
344    jcr->SD_msg_chan = 0;
345    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
346    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
347    free_jcr(jcr);                     /* release jcr */
348    db_thread_cleanup();               /* remove thread specific data */
349 }
350
351 /*
352  * Handle the message channel (i.e. requests from the
353  *  Storage daemon).
354  * Note, we are running in a separate thread.
355  */
356 extern "C" void *msg_thread(void *arg)
357 {
358    JCR *jcr = (JCR *)arg;
359    BSOCK *sd;
360    int JobStatus;
361    char Job[MAX_NAME_LENGTH];
362    uint32_t JobFiles;
363    uint64_t JobBytes;
364    int stat;
365
366    pthread_detach(pthread_self());
367    jcr->SD_msg_chan = pthread_self();
368    pthread_cleanup_push(msg_thread_cleanup, arg);
369    sd = jcr->store_bsock;
370
371    /* Read the Storage daemon's output.
372     */
373    Dmsg0(100, "Start msg_thread loop\n");
374    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
375       Dmsg1(400, "<stored: %s", sd->msg);
376       if (sscanf(sd->msg, Job_start, Job) == 1) {
377          continue;
378       }
379       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
380                  &JobBytes)) == 4) {
381          jcr->SDJobStatus = JobStatus; /* termination status */
382          jcr->SDJobFiles = JobFiles;
383          jcr->SDJobBytes = JobBytes;
384          break;
385       }
386       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
387    }
388    if (is_bnet_error(sd)) {
389       jcr->SDJobStatus = JS_ErrorTerminated;
390    }
391    pthread_cleanup_pop(1);            /* remove and execute the handler */
392    return NULL;
393 }
394
395 void wait_for_storage_daemon_termination(JCR *jcr)
396 {
397    int cancel_count = 0;
398    /* Now wait for Storage daemon to terminate our message thread */
399    while (!jcr->sd_msg_thread_done) {
400       struct timeval tv;
401       struct timezone tz;
402       struct timespec timeout;
403
404       gettimeofday(&tv, &tz);
405       timeout.tv_nsec = 0;
406       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
407       Dmsg0(400, "I'm waiting for message thread termination.\n");
408       P(mutex);
409       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
410       V(mutex);
411       if (job_canceled(jcr)) {
412          if (jcr->SD_msg_chan) {
413             jcr->store_bsock->set_timed_out();
414             jcr->store_bsock->set_terminated();
415             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
416             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
417          }
418          cancel_count++;
419       }
420       /* Give SD 30 seconds to clean up after cancel */
421       if (cancel_count == 6) {
422          break;
423       }
424    }
425    set_jcr_job_status(jcr, JS_Terminated);
426 }
427
428 #ifdef needed
429 #define MAX_TRIES 30
430 #define WAIT_TIME 2
431 extern "C" void *device_thread(void *arg)
432 {
433    int i;
434    JCR *jcr;
435    DEVICE *dev;
436
437
438    pthread_detach(pthread_self());
439    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
440    for (i=0; i < MAX_TRIES; i++) {
441       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
442          Dmsg0(900, "Failed connecting to SD.\n");
443          continue;
444       }
445       LockRes();
446       foreach_res(dev, R_DEVICE) {
447          if (!update_device_res(jcr, dev)) {
448             Dmsg1(900, "Error updating device=%s\n", dev->name());
449          } else {
450             Dmsg1(900, "Updated Device=%s\n", dev->name());
451          }
452       }
453       UnlockRes();
454       bnet_close(jcr->store_bsock);
455       jcr->store_bsock = NULL;
456       break;
457
458    }
459    free_jcr(jcr);
460    return NULL;
461 }
462
463 /*
464  * Start a thread to handle getting Device resource information
465  *  from SD. This is called once at startup of the Director.
466  */
467 void init_device_resources()
468 {
469    int status;
470    pthread_t thid;
471
472    Dmsg0(100, "Start Device thread.\n");
473    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
474       berrno be;
475       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
476    }
477 }
478 #endif