]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Merge branch 'master' into basejobv3
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd = new_bsock();
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd->set_source_address(director->DIRsrc_addr);
106    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
107          store->address, NULL, store->SDport, verbose)) {
108       sd->destroy();
109       sd = NULL;
110    }
111
112    if (sd == NULL) {
113       return false;
114    }
115    sd->res = (RES *)store;        /* save pointer to other end */
116    jcr->store_bsock = sd;
117
118    if (!authenticate_storage_daemon(jcr, store)) {
119       sd->close();
120       jcr->store_bsock = NULL;
121       return false;
122    }
123    return true;
124 }
125
126 /*
127  * Here we ask the SD to send us the info for a 
128  *  particular device resource.
129  */
130 #ifdef xxx
131 bool update_device_res(JCR *jcr, DEVICE *dev)
132 {
133    POOL_MEM device_name; 
134    BSOCK *sd;
135    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
136       return false;
137    }
138    sd = jcr->store_bsock;
139    pm_strcpy(device_name, dev->name());
140    bash_spaces(device_name);
141    sd->fsend(query_device, device_name.c_str());
142    Dmsg1(100, ">stored: %s\n", sd->msg);
143    /* The data is returned through Device_update */
144    if (bget_dirmsg(sd) <= 0) {
145       return false;
146    }
147    return true;
148 }
149 #endif
150
151 static char OKbootstrap[] = "3000 OK bootstrap\n";
152
153 /*
154  * Start a job with the Storage daemon
155  */
156 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
157 {
158    bool ok = true;
159    STORE *storage;
160    BSOCK *sd;
161    char auth_key[100];
162    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
163    POOL_MEM job_name, client_name, fileset_name;
164    int copy = 0;
165    int stripe = 0;
166    char ed1[30], ed2[30];
167
168    sd = jcr->store_bsock;
169    /*
170     * Now send JobId and permissions, and get back the authorization key.
171     */
172    pm_strcpy(job_name, jcr->job->name());
173    bash_spaces(job_name);
174    pm_strcpy(client_name, jcr->client->name());
175    bash_spaces(client_name);
176    pm_strcpy(fileset_name, jcr->fileset->name());
177    bash_spaces(fileset_name);
178    if (jcr->fileset->MD5[0] == 0) {
179       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
180    }
181    /* If rescheduling, cancel the previous incarnation of this job
182     *  with the SD, which might be waiting on the FD connection.
183     *  If we do not cancel it the SD will not accept a new connection
184     *  for the same jobid.
185     */
186    if (jcr->reschedule_count) {
187       sd->fsend("cancel Job=%s\n", jcr->Job);
188       while (sd->recv() >= 0)
189          { }
190    } 
191    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
192              job_name.c_str(), client_name.c_str(), 
193              jcr->get_JobType(), jcr->get_JobLevel(),
194              fileset_name.c_str(), !jcr->pool->catalog_files,
195              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
196              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
197              edit_int64(jcr->spool_size, ed2));
198    Dmsg1(100, ">stored: %s", sd->msg);
199    if (bget_dirmsg(sd) > 0) {
200        Dmsg1(100, "<stored: %s", sd->msg);
201        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
202                   &jcr->VolSessionTime, &auth_key) != 3) {
203           Dmsg1(100, "BadJob=%s\n", sd->msg);
204           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
205           return false;
206        } else {
207           bfree_and_null(jcr->sd_auth_key);
208           jcr->sd_auth_key = bstrdup(auth_key);
209           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
210        }
211    } else {
212       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
213          sd->bstrerror());
214       return false;
215    }
216
217    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
218        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
219       return false;
220    }
221
222    /*
223     * We have two loops here. The first comes from the 
224     *  Storage = associated with the Job, and we need 
225     *  to attach to each one.
226     * The inner loop loops over all the alternative devices
227     *  associated with each Storage. It selects the first
228     *  available one.
229     *
230     */
231    /* Do read side of storage daemon */
232    if (ok && rstore) {
233       /* For the moment, only migrate, copy and vbackup have rpool */
234       if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
235            (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
236          pm_strcpy(pool_type, jcr->rpool->pool_type);
237          pm_strcpy(pool_name, jcr->rpool->name());
238       } else {
239          pm_strcpy(pool_type, jcr->pool->pool_type);
240          pm_strcpy(pool_name, jcr->pool->name());
241       }
242       bash_spaces(pool_type);
243       bash_spaces(pool_name);
244       foreach_alist(storage, rstore) {
245          Dmsg1(100, "Rstore=%s\n", storage->name());
246          pm_strcpy(store_name, storage->name());
247          bash_spaces(store_name);
248          pm_strcpy(media_type, storage->media_type);
249          bash_spaces(media_type);
250          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
251                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
252          Dmsg1(100, "rstore >stored: %s", sd->msg);
253          DEVICE *dev;
254          /* Loop over alternative storage Devices until one is OK */
255          foreach_alist(dev, storage->device) {
256             pm_strcpy(device_name, dev->name());
257             bash_spaces(device_name);
258             sd->fsend(use_device, device_name.c_str());
259             Dmsg1(100, ">stored: %s", sd->msg);
260          }
261          sd->signal(BNET_EOD);           /* end of Devices */
262       }
263       sd->signal(BNET_EOD);              /* end of Storages */
264       if (bget_dirmsg(sd) > 0) {
265          Dmsg1(100, "<stored: %s", sd->msg);
266          /* ****FIXME**** save actual device name */
267          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
268       } else {
269          ok = false;
270       }
271    }
272
273    /* Do write side of storage daemon */
274    if (ok && wstore) {
275       pm_strcpy(pool_type, jcr->pool->pool_type);
276       pm_strcpy(pool_name, jcr->pool->name());
277       bash_spaces(pool_type);
278       bash_spaces(pool_name);
279       foreach_alist(storage, wstore) {
280          pm_strcpy(store_name, storage->name());
281          bash_spaces(store_name);
282          pm_strcpy(media_type, storage->media_type);
283          bash_spaces(media_type);
284          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
285                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
286
287          Dmsg1(100, "wstore >stored: %s", sd->msg);
288          DEVICE *dev;
289          /* Loop over alternative storage Devices until one is OK */
290          foreach_alist(dev, storage->device) {
291             pm_strcpy(device_name, dev->name());
292             bash_spaces(device_name);
293             sd->fsend(use_device, device_name.c_str());
294             Dmsg1(100, ">stored: %s", sd->msg);
295          }
296          sd->signal(BNET_EOD);           /* end of Devices */
297       }
298       sd->signal(BNET_EOD);              /* end of Storages */
299       if (bget_dirmsg(sd) > 0) {
300          Dmsg1(100, "<stored: %s", sd->msg);
301          /* ****FIXME**** save actual device name */
302          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
303       } else {
304          ok = false;
305       }
306    }
307    if (!ok) {
308       POOL_MEM err_msg;
309       if (sd->msg[0]) {
310          pm_strcpy(err_msg, sd->msg); /* save message */
311          Jmsg(jcr, M_FATAL, 0, _("\n"
312               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
313               device_name.c_str(), err_msg.c_str()/* sd->msg */);
314       } else { 
315          Jmsg(jcr, M_FATAL, 0, _("\n"
316               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
317               device_name.c_str());
318       }
319    } else {
320       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
321    }
322    return ok;
323 }
324
325 /*
326  * Start a thread to handle Storage daemon messages and
327  *  Catalog requests.
328  */
329 bool start_storage_daemon_message_thread(JCR *jcr)
330 {
331    int status;
332    pthread_t thid;
333
334    jcr->inc_use_count();              /* mark in use by msg thread */
335    jcr->sd_msg_thread_done = false;
336    jcr->SD_msg_chan = 0;
337    Dmsg0(100, "Start SD msg_thread.\n");
338    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
339       berrno be;
340       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
341    }
342    /* Wait for thread to start */
343    while (jcr->SD_msg_chan == 0) {
344       bmicrosleep(0, 50);
345       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
346          return false;
347       }
348    }
349    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
350    return true;
351 }
352
353 extern "C" void msg_thread_cleanup(void *arg)
354 {
355    JCR *jcr = (JCR *)arg;
356    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
357    jcr->sd_msg_thread_done = true;
358    jcr->SD_msg_chan = 0;
359    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
360    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
361    free_jcr(jcr);                     /* release jcr */
362    db_thread_cleanup();               /* remove thread specific data */
363 }
364
365 /*
366  * Handle the message channel (i.e. requests from the
367  *  Storage daemon).
368  * Note, we are running in a separate thread.
369  */
370 extern "C" void *msg_thread(void *arg)
371 {
372    JCR *jcr = (JCR *)arg;
373    BSOCK *sd;
374    int JobStatus;
375    char Job[MAX_NAME_LENGTH];
376    uint32_t JobFiles, JobErrors;
377    uint64_t JobBytes;
378
379    pthread_detach(pthread_self());
380    set_jcr_in_tsd(jcr);
381    jcr->SD_msg_chan = pthread_self();
382    pthread_cleanup_push(msg_thread_cleanup, arg);
383    sd = jcr->store_bsock;
384
385    /* Read the Storage daemon's output.
386     */
387    Dmsg0(100, "Start msg_thread loop\n");
388    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
389       Dmsg1(400, "<stored: %s", sd->msg);
390       if (sscanf(sd->msg, Job_start, Job) == 1) {
391          continue;
392       }
393       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
394                  &JobBytes, &JobErrors) == 5) {
395          jcr->SDJobStatus = JobStatus; /* termination status */
396          jcr->SDJobFiles = JobFiles;
397          jcr->SDJobBytes = JobBytes;
398          jcr->SDErrors = JobErrors;
399          break;
400       }
401       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
402    }
403    if (is_bnet_error(sd)) {
404       jcr->SDJobStatus = JS_ErrorTerminated;
405    }
406    pthread_cleanup_pop(1);            /* remove and execute the handler */
407    return NULL;
408 }
409
410 void wait_for_storage_daemon_termination(JCR *jcr)
411 {
412    int cancel_count = 0;
413    /* Now wait for Storage daemon to terminate our message thread */
414    while (!jcr->sd_msg_thread_done) {
415       struct timeval tv;
416       struct timezone tz;
417       struct timespec timeout;
418
419       gettimeofday(&tv, &tz);
420       timeout.tv_nsec = 0;
421       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
422       Dmsg0(400, "I'm waiting for message thread termination.\n");
423       P(mutex);
424       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
425       V(mutex);
426       if (job_canceled(jcr)) {
427          if (jcr->SD_msg_chan) {
428             jcr->store_bsock->set_timed_out();
429             jcr->store_bsock->set_terminated();
430             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
431             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
432          }
433          cancel_count++;
434       }
435       /* Give SD 30 seconds to clean up after cancel */
436       if (cancel_count == 6) {
437          break;
438       }
439    }
440    set_jcr_job_status(jcr, JS_Terminated);
441 }
442
443 /*
444  * Send bootstrap file to Storage daemon.
445  *  This is used for restore, verify VolumeToCatalog, migration,
446  *    and copy Jobs.
447  */
448 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
449 {
450    FILE *bs;
451    char buf[1000];
452    const char *bootstrap = "bootstrap\n";
453
454    Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
455    if (!jcr->RestoreBootstrap) {
456       return true;
457    }
458    bs = fopen(jcr->RestoreBootstrap, "rb");
459    if (!bs) {
460       berrno be;
461       Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
462          jcr->RestoreBootstrap, be.bstrerror());
463       set_jcr_job_status(jcr, JS_ErrorTerminated);
464       return false;
465    }
466    sd->fsend(bootstrap);
467    while (fgets(buf, sizeof(buf), bs)) {
468       sd->fsend("%s", buf);
469    }
470    sd->signal(BNET_EOD);
471    fclose(bs);
472    if (jcr->unlink_bsr) {
473       unlink(jcr->RestoreBootstrap);
474       jcr->unlink_bsr = false;
475    }                         
476    return true;
477 }
478
479
480 #ifdef needed
481 #define MAX_TRIES 30
482 #define WAIT_TIME 2
483 extern "C" void *device_thread(void *arg)
484 {
485    int i;
486    JCR *jcr;
487    DEVICE *dev;
488
489
490    pthread_detach(pthread_self());
491    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
492    for (i=0; i < MAX_TRIES; i++) {
493       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
494          Dmsg0(900, "Failed connecting to SD.\n");
495          continue;
496       }
497       LockRes();
498       foreach_res(dev, R_DEVICE) {
499          if (!update_device_res(jcr, dev)) {
500             Dmsg1(900, "Error updating device=%s\n", dev->name());
501          } else {
502             Dmsg1(900, "Updated Device=%s\n", dev->name());
503          }
504       }
505       UnlockRes();
506       bnet_close(jcr->store_bsock);
507       jcr->store_bsock = NULL;
508       break;
509
510    }
511    free_jcr(jcr);
512    return NULL;
513 }
514
515 /*
516  * Start a thread to handle getting Device resource information
517  *  from SD. This is called once at startup of the Director.
518  */
519 void init_device_resources()
520 {
521    int status;
522    pthread_t thid;
523
524    Dmsg0(100, "Start Device thread.\n");
525    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
526       berrno be;
527       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
528    }
529 }
530 #endif