]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
c012e2cee532921511c6661eac835db2c70de14b
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from many
7    others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    Bacula® is a registered trademark of Kern Sibbald.
15 */
16 /*
17  *
18  *   Bacula Director -- msgchan.c -- handles the message channel
19  *    to the Storage daemon and the File daemon.
20  *
21  *     Written by Kern Sibbald, August MM
22  *
23  *    This routine runs as a thread and must be thread reentrant.
24  *
25  *  Basic tasks done here:
26  *    Open a message channel with the Storage daemon
27  *      to authenticate ourself and to pass the JobId.
28  *    Create a thread to interact with the Storage daemon
29  *      who returns a job status and requests Catalog services, etc.
30  *
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
40    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
42    "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
43    "Authorization=%s\n";
44 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
45    "pool_type=%s append=%d copy=%d stripe=%d\n";
46 static char use_device[] = "use device=%s\n";
47 //static char query_device[] = _("query device=%s");
48
49 /* Response from Storage daemon */
50 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
51 static char OK_device[]  = "3000 OK use device device=%s\n";
52
53 /* Storage Daemon requests */
54 static char Job_start[]  = "3010 Job %127s start\n";
55 static char Job_end[]    =
56    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
57
58 /* Forward referenced functions */
59 extern "C" void *msg_thread(void *arg);
60
61 BSOCK *open_sd_bsock(UAContext *ua)
62 {
63    STORE *store = ua->jcr->wstore;
64
65    if (!is_bsock_open(ua->jcr->store_bsock)) {
66       ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
67          store->name(), store->address, store->SDport);
68       if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
69          ua->error_msg(_("Failed to connect to Storage daemon.\n"));
70          return NULL;
71       }
72    }
73    return ua->jcr->store_bsock;
74 }
75
76 void close_sd_bsock(UAContext *ua)
77 {
78    if (ua->jcr->store_bsock) {
79       ua->jcr->store_bsock->signal(BNET_TERMINATE);
80       free_bsock(ua->jcr->store_bsock);
81    }
82 }
83
84 /*
85  * Establish a message channel connection with the Storage daemon
86  * and perform authentication.
87  */
88 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
89                               int max_retry_time, int verbose)
90 {
91    BSOCK *sd = jcr->store_bsock;
92    STORE *store;
93    utime_t heart_beat;
94
95    if (is_bsock_open(sd)) {
96       return true;                    /* already connected */
97    }
98    if (!sd) {
99       sd = new_bsock();
100    }
101
102    /* If there is a write storage use it */
103    if (jcr->wstore) {
104       store = jcr->wstore;
105    } else {
106       store = jcr->rstore;
107    }
108
109    if (store->heartbeat_interval) {
110       heart_beat = store->heartbeat_interval;
111    } else {
112       heart_beat = director->heartbeat_interval;
113    }
114
115    /*
116     *  Open message channel with the Storage daemon
117     */
118    Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
119       store->SDport);
120    sd->set_source_address(director->DIRsrc_addr);
121    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
122          store->address, NULL, store->SDport, verbose)) {
123       sd = NULL;
124    }
125
126    if (sd == NULL) {
127       return false;
128    }
129    sd->res = (RES *)store;        /* save pointer to other end */
130    jcr->store_bsock = sd;
131
132    if (!authenticate_storage_daemon(jcr, store)) {
133       sd->close();
134       return false;
135    }
136    return true;
137 }
138
139 /*
140  * Here we ask the SD to send us the info for a
141  *  particular device resource.
142  */
143 #ifdef xxx
144 bool update_device_res(JCR *jcr, DEVICE *dev)
145 {
146    POOL_MEM device_name;
147    BSOCK *sd;
148    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
149       return false;
150    }
151    sd = jcr->store_bsock;
152    pm_strcpy(device_name, dev->name());
153    bash_spaces(device_name);
154    sd->fsend(query_device, device_name.c_str());
155    Dmsg1(100, ">stored: %s\n", sd->msg);
156    /* The data is returned through Device_update */
157    if (bget_dirmsg(sd) <= 0) {
158       return false;
159    }
160    return true;
161 }
162 #endif
163
164 static char OKbootstrap[] = "3000 OK bootstrap\n";
165
166 /*
167  * Start a job with the Storage daemon
168  */
169 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
170 {
171    bool ok = true;
172    STORE *storage;
173    BSOCK *sd;
174    char sd_auth_key[100];
175    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
176    POOL_MEM job_name, client_name, fileset_name;
177    int copy = 0;
178    int stripe = 0;
179    char ed1[30], ed2[30];
180    int sd_client;
181
182    sd = jcr->store_bsock;
183    /*
184     * Now send JobId and permissions, and get back the authorization key.
185     */
186    pm_strcpy(job_name, jcr->job->name());
187    bash_spaces(job_name);
188    pm_strcpy(client_name, jcr->client->name());
189    bash_spaces(client_name);
190    pm_strcpy(fileset_name, jcr->fileset->name());
191    bash_spaces(fileset_name);
192    if (jcr->fileset->MD5[0] == 0) {
193       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
194    }
195    /* If rescheduling, cancel the previous incarnation of this job
196     *  with the SD, which might be waiting on the FD connection.
197     *  If we do not cancel it the SD will not accept a new connection
198     *  for the same jobid.
199     */
200    if (jcr->reschedule_count) {
201       sd->fsend("cancel Job=%s\n", jcr->Job);
202       while (sd->recv() >= 0)
203          { }
204    }
205
206    sd_client = jcr->sd_client;
207    if (jcr->sd_auth_key) {
208       bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
209    } else {
210       bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
211    }
212
213    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
214              job_name.c_str(), client_name.c_str(),
215              jcr->getJobType(), jcr->getJobLevel(),
216              fileset_name.c_str(), !jcr->pool->catalog_files,
217              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
218              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
219              edit_int64(jcr->spool_size, ed2), jcr->rerunning,
220              jcr->VolSessionId, jcr->VolSessionTime, sd_client,
221              sd_auth_key);
222
223    Dmsg1(100, ">stored: %s", sd->msg);
224    Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
225    if (bget_dirmsg(sd) > 0) {
226        Dmsg1(100, "<stored: %s", sd->msg);
227        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
228                   &jcr->VolSessionTime, &sd_auth_key) != 3) {
229           Dmsg1(100, "BadJob=%s\n", sd->msg);
230           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
231           return false;
232        } else {
233           bfree_and_null(jcr->sd_auth_key);
234           jcr->sd_auth_key = bstrdup(sd_auth_key);
235           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
236        }
237    } else {
238       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
239          sd->bstrerror());
240       return false;
241    }
242
243    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
244        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
245       return false;
246    }
247
248    /*
249     * We have two loops here. The first comes from the
250     *  Storage = associated with the Job, and we need
251     *  to attach to each one.
252     * The inner loop loops over all the alternative devices
253     *  associated with each Storage. It selects the first
254     *  available one.
255     *
256     */
257    /* Do read side of storage daemon */
258    if (ok && rstore) {
259       /* For the moment, only migrate, copy and vbackup have rpool */
260       if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
261            (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
262          pm_strcpy(pool_type, jcr->rpool->pool_type);
263          pm_strcpy(pool_name, jcr->rpool->name());
264       } else {
265          pm_strcpy(pool_type, jcr->pool->pool_type);
266          pm_strcpy(pool_name, jcr->pool->name());
267       }
268       bash_spaces(pool_type);
269       bash_spaces(pool_name);
270       foreach_alist(storage, rstore) {
271          Dmsg1(100, "Rstore=%s\n", storage->name());
272          pm_strcpy(store_name, storage->name());
273          bash_spaces(store_name);
274          if (jcr->media_type) {
275             pm_strcpy(media_type, jcr->media_type);  /* user override */
276          } else {
277             pm_strcpy(media_type, storage->media_type);
278          }
279          bash_spaces(media_type);
280          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
281                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
282          Dmsg1(100, "rstore >stored: %s", sd->msg);
283          DEVICE *dev;
284          /* Loop over alternative storage Devices until one is OK */
285          foreach_alist(dev, storage->device) {
286             pm_strcpy(device_name, dev->name());
287             bash_spaces(device_name);
288             sd->fsend(use_device, device_name.c_str());
289             Dmsg1(100, ">stored: %s", sd->msg);
290          }
291          sd->signal(BNET_EOD);           /* end of Devices */
292       }
293       sd->signal(BNET_EOD);              /* end of Storages */
294       if (bget_dirmsg(sd) > 0) {
295          Dmsg1(100, "<stored: %s", sd->msg);
296          /* ****FIXME**** save actual device name */
297          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
298       } else {
299          ok = false;
300       }
301       if (ok) {
302          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
303       }
304    }
305
306    /* Do write side of storage daemon */
307    if (ok && wstore) {
308       pm_strcpy(pool_type, jcr->pool->pool_type);
309       pm_strcpy(pool_name, jcr->pool->name());
310       bash_spaces(pool_type);
311       bash_spaces(pool_name);
312       foreach_alist(storage, wstore) {
313          Dmsg1(100, "Wstore=%s\n", storage->name());
314          pm_strcpy(store_name, storage->name());
315          bash_spaces(store_name);
316          pm_strcpy(media_type, storage->media_type);
317          bash_spaces(media_type);
318          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
319                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
320
321          Dmsg1(100, "wstore >stored: %s", sd->msg);
322          DEVICE *dev;
323          /* Loop over alternative storage Devices until one is OK */
324          foreach_alist(dev, storage->device) {
325             pm_strcpy(device_name, dev->name());
326             bash_spaces(device_name);
327             sd->fsend(use_device, device_name.c_str());
328             Dmsg1(100, ">stored: %s", sd->msg);
329          }
330          sd->signal(BNET_EOD);           /* end of Devices */
331       }
332       sd->signal(BNET_EOD);              /* end of Storages */
333       if (bget_dirmsg(sd) > 0) {
334          Dmsg1(100, "<stored: %s", sd->msg);
335          /* ****FIXME**** save actual device name */
336          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
337       } else {
338          ok = false;
339       }
340       if (ok) {
341          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
342       }
343    }
344    if (!ok) {
345       POOL_MEM err_msg;
346       if (sd->msg[0]) {
347          pm_strcpy(err_msg, sd->msg); /* save message */
348          Jmsg(jcr, M_FATAL, 0, _("\n"
349               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
350               device_name.c_str(), err_msg.c_str()/* sd->msg */);
351       } else {
352          Jmsg(jcr, M_FATAL, 0, _("\n"
353               "     Storage daemon didn't accept Device \"%s\" command.\n"),
354               device_name.c_str());
355       }
356    }
357    return ok;
358 }
359
360 /*
361  * Start a thread to handle Storage daemon messages and
362  *  Catalog requests.
363  */
364 bool start_storage_daemon_message_thread(JCR *jcr)
365 {
366    int status;
367    pthread_t thid;
368
369    jcr->inc_use_count();              /* mark in use by msg thread */
370    jcr->sd_msg_thread_done = false;
371    jcr->SD_msg_chan = 0;
372    Dmsg0(150, "Start SD msg_thread.\n");
373    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
374       berrno be;
375       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
376    }
377    /* Wait for thread to start */
378    while (jcr->SD_msg_chan == 0) {
379       bmicrosleep(0, 50);
380       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
381          return false;
382       }
383    }
384    Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
385    return true;
386 }
387
388 extern "C" void msg_thread_cleanup(void *arg)
389 {
390    JCR *jcr = (JCR *)arg;
391    db_end_transaction(jcr, jcr->db);        /* terminate any open transaction */
392    jcr->lock();
393    jcr->sd_msg_thread_done = true;
394    jcr->SD_msg_chan = 0;
395    jcr->unlock();
396    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
397    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
398    db_thread_cleanup(jcr->db);              /* remove thread specific data */
399    free_jcr(jcr);                           /* release jcr */
400 }
401
402 /*
403  * Handle the message channel (i.e. requests from the
404  *  Storage daemon).
405  * Note, we are running in a separate thread.
406  */
407 extern "C" void *msg_thread(void *arg)
408 {
409    JCR *jcr = (JCR *)arg;
410    BSOCK *sd;
411    int JobStatus;
412    int n;
413    char Job[MAX_NAME_LENGTH];
414    uint32_t JobFiles, JobErrors;
415    uint64_t JobBytes;
416
417    pthread_detach(pthread_self());
418    set_jcr_in_tsd(jcr);
419    jcr->SD_msg_chan = pthread_self();
420    pthread_cleanup_push(msg_thread_cleanup, arg);
421    sd = jcr->store_bsock;
422
423    /* Read the Storage daemon's output.
424     */
425    Dmsg0(100, "Start msg_thread loop\n");
426    n = 0;
427    while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
428       Dmsg1(400, "<stored: %s", sd->msg);
429       if (sscanf(sd->msg, Job_start, Job) == 1) {
430          continue;
431       }
432       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
433                  &JobBytes, &JobErrors) == 5) {
434          jcr->SDJobStatus = JobStatus; /* termination status */
435          jcr->SDJobFiles = JobFiles;
436          jcr->SDJobBytes = JobBytes;
437          jcr->SDErrors = JobErrors;
438          break;
439       }
440       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
441    }
442    if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
443       /*
444        * This probably should be M_FATAL, but I am not 100% sure
445        *  that this return *always* corresponds to a dropped line.
446        */
447       Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
448    }
449    if (jcr->getJobStatus() == JS_Canceled) {
450       jcr->SDJobStatus = JS_Canceled;
451    } else if (sd->is_error()) {
452       jcr->SDJobStatus = JS_ErrorTerminated;
453    }
454    pthread_cleanup_pop(1);            /* remove and execute the handler */
455    return NULL;
456 }
457
458 void wait_for_storage_daemon_termination(JCR *jcr)
459 {
460    int cancel_count = 0;
461    /* Now wait for Storage daemon to terminate our message thread */
462    while (!jcr->sd_msg_thread_done) {
463       struct timeval tv;
464       struct timezone tz;
465       struct timespec timeout;
466
467       gettimeofday(&tv, &tz);
468       timeout.tv_nsec = 0;
469       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
470       Dmsg0(400, "I'm waiting for message thread termination.\n");
471       P(mutex);
472       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
473       V(mutex);
474       if (jcr->is_canceled()) {
475          if (jcr->SD_msg_chan) {
476             jcr->store_bsock->set_timed_out();
477             jcr->store_bsock->set_terminated();
478             sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
479          }
480          cancel_count++;
481       }
482       /* Give SD 30 seconds to clean up after cancel */
483       if (cancel_count == 6) {
484          break;
485       }
486    }
487    jcr->setJobStatus(JS_Terminated);
488 }
489
490 /*
491  * Send bootstrap file to Storage daemon.
492  *  This is used for restore, verify VolumeToCatalog, migration,
493  *    and copy Jobs.
494  */
495 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
496 {
497    FILE *bs;
498    char buf[1000];
499    const char *bootstrap = "bootstrap\n";
500
501    Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
502    if (!jcr->RestoreBootstrap) {
503       return true;
504    }
505    bs = fopen(jcr->RestoreBootstrap, "rb");
506    if (!bs) {
507       berrno be;
508       Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
509          jcr->RestoreBootstrap, be.bstrerror());
510       jcr->setJobStatus(JS_ErrorTerminated);
511       return false;
512    }
513    sd->fsend(bootstrap);
514    while (fgets(buf, sizeof(buf), bs)) {
515       sd->fsend("%s", buf);
516    }
517    sd->signal(BNET_EOD);
518    fclose(bs);
519    if (jcr->unlink_bsr) {
520       unlink(jcr->RestoreBootstrap);
521       jcr->unlink_bsr = false;
522    }
523    return true;
524 }
525
526
527 #ifdef needed
528 #define MAX_TRIES 30
529 #define WAIT_TIME 2
530 extern "C" void *device_thread(void *arg)
531 {
532    int i;
533    JCR *jcr;
534    DEVICE *dev;
535
536
537    pthread_detach(pthread_self());
538    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
539    for (i=0; i < MAX_TRIES; i++) {
540       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
541          Dmsg0(900, "Failed connecting to SD.\n");
542          continue;
543       }
544       LockRes();
545       foreach_res(dev, R_DEVICE) {
546          if (!update_device_res(jcr, dev)) {
547             Dmsg1(900, "Error updating device=%s\n", dev->name());
548          } else {
549             Dmsg1(900, "Updated Device=%s\n", dev->name());
550          }
551       }
552       UnlockRes();
553       free_bsock(jcr->store_bsock);
554       break;
555
556    }
557    free_jcr(jcr);
558    return NULL;
559 }
560
561 /*
562  * Start a thread to handle getting Device resource information
563  *  from SD. This is called once at startup of the Director.
564  */
565 void init_device_resources()
566 {
567    int status;
568    pthread_t thid;
569
570    Dmsg0(100, "Start Device thread.\n");
571    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
572       berrno be;
573       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
574    }
575 }
576 #endif