2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2017 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
20 * Bacula Director -- msgchan.c -- handles the message channel
21 * to the Storage daemon and the File daemon.
23 * Written by Kern Sibbald, August MM
25 * This routine runs as a thread and must be thread reentrant.
27 * Basic tasks done here:
28 * Open a message channel with the Storage daemon
29 * to authenticate ourself and to pass the JobId.
30 * Create a thread to interact with the Storage daemon
31 * who returns a job status and requests Catalog services, etc.
37 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
39 /* Commands sent to Storage daemon */
40 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
41 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
42 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
43 "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
45 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
46 "pool_type=%s append=%d copy=%d stripe=%d\n";
47 static char use_device[] = "use device=%s\n";
48 //static char query_device[] = _("query device=%s");
50 /* Response from Storage daemon */
51 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
52 static char OK_device[] = "3000 OK use device device=%s\n";
54 /* Storage Daemon requests */
55 static char Job_start[] = "3010 Job %127s start\n";
56 static char Job_end[] =
57 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u ErrMsg=%256s\n";
59 /* Forward referenced functions */
60 extern "C" void *msg_thread(void *arg);
62 BSOCK *open_sd_bsock(UAContext *ua)
64 STORE *store = ua->jcr->wstore;
66 if (!is_bsock_open(ua->jcr->store_bsock)) {
67 ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
68 store->name(), store->address, store->SDport);
69 if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
70 ua->error_msg(_("Failed to connect to Storage daemon.\n"));
74 return ua->jcr->store_bsock;
77 void close_sd_bsock(UAContext *ua)
79 if (ua->jcr->store_bsock) {
80 ua->jcr->store_bsock->signal(BNET_TERMINATE);
81 free_bsock(ua->jcr->store_bsock);
86 * Establish a message channel connection with the Storage daemon
87 * and perform authentication.
89 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
90 int max_retry_time, int verbose)
92 BSOCK *sd = jcr->store_bsock;
96 if (is_bsock_open(sd)) {
97 return true; /* already connected */
103 /* If there is a write storage use it */
110 if (store->heartbeat_interval) {
111 heart_beat = store->heartbeat_interval;
113 heart_beat = director->heartbeat_interval;
117 * Open message channel with the Storage daemon
119 Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
121 sd->set_source_address(director->DIRsrc_addr);
122 if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
123 store->address, NULL, store->SDport, verbose)) {
125 if (!jcr->store_bsock) { /* The bsock was locally created, so we free it here */
134 sd->res = (RES *)store; /* save pointer to other end */
135 jcr->store_bsock = sd;
137 if (!authenticate_storage_daemon(jcr, store)) {
145 * Here we ask the SD to send us the info for a
146 * particular device resource.
149 bool update_device_res(JCR *jcr, DEVICE *dev)
151 POOL_MEM device_name;
153 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
156 sd = jcr->store_bsock;
157 pm_strcpy(device_name, dev->name());
158 bash_spaces(device_name);
159 sd->fsend(query_device, device_name.c_str());
160 Dmsg1(100, ">stored: %s\n", sd->msg);
161 /* The data is returned through Device_update */
162 if (bget_dirmsg(sd) <= 0) {
169 static char OKbootstrap[] = "3000 OK bootstrap\n";
172 * Start a job with the Storage daemon
174 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
179 char sd_auth_key[100];
180 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
181 POOL_MEM job_name, client_name, fileset_name;
184 char ed1[30], ed2[30];
187 sd = jcr->store_bsock;
189 * Now send JobId and permissions, and get back the authorization key.
191 pm_strcpy(job_name, jcr->job->name());
192 bash_spaces(job_name);
194 pm_strcpy(client_name, jcr->client->name());
196 pm_strcpy(client_name, "**Dummy**");
198 bash_spaces(client_name);
199 pm_strcpy(fileset_name, jcr->fileset->name());
200 bash_spaces(fileset_name);
201 if (jcr->fileset->MD5[0] == 0) {
202 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
204 /* If rescheduling, cancel the previous incarnation of this job
205 * with the SD, which might be waiting on the FD connection.
206 * If we do not cancel it the SD will not accept a new connection
207 * for the same jobid.
209 if (jcr->reschedule_count) {
210 sd->fsend("cancel Job=%s\n", jcr->Job);
211 while (sd->recv() >= 0)
215 sd_client = jcr->sd_client;
216 if (jcr->sd_auth_key) {
217 bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
219 bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
222 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
223 job_name.c_str(), client_name.c_str(),
224 jcr->getJobType(), jcr->getJobLevel(),
225 fileset_name.c_str(), !jcr->pool->catalog_files,
226 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
227 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
228 edit_int64(jcr->spool_size, ed2), jcr->rerunning,
229 jcr->VolSessionId, jcr->VolSessionTime, sd_client,
232 Dmsg1(100, ">stored: %s", sd->msg);
233 Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
234 if (bget_dirmsg(sd) > 0) {
235 Dmsg1(100, "<stored: %s", sd->msg);
236 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
237 &jcr->VolSessionTime, &sd_auth_key) != 3) {
238 Dmsg1(100, "BadJob=%s\n", sd->msg);
239 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
242 bfree_and_null(jcr->sd_auth_key);
243 jcr->sd_auth_key = bstrdup(sd_auth_key);
244 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
247 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
252 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
253 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
258 * We have two loops here. The first comes from the
259 * Storage = associated with the Job, and we need
260 * to attach to each one.
261 * The inner loop loops over all the alternative devices
262 * associated with each Storage. It selects the first
266 /* Do read side of storage daemon */
268 /* For the moment, only migrate, copy and vbackup have rpool */
269 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
270 (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
271 pm_strcpy(pool_type, jcr->rpool->pool_type);
272 pm_strcpy(pool_name, jcr->rpool->name());
274 pm_strcpy(pool_type, jcr->pool->pool_type);
275 pm_strcpy(pool_name, jcr->pool->name());
277 bash_spaces(pool_type);
278 bash_spaces(pool_name);
279 foreach_alist(storage, rstore) {
280 Dmsg1(100, "Rstore=%s\n", storage->name());
281 pm_strcpy(store_name, storage->name());
282 bash_spaces(store_name);
283 if (jcr->media_type) {
284 pm_strcpy(media_type, jcr->media_type); /* user override */
286 pm_strcpy(media_type, storage->media_type);
288 bash_spaces(media_type);
289 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
290 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
291 Dmsg1(100, "rstore >stored: %s", sd->msg);
293 /* Loop over alternative storage Devices until one is OK */
294 foreach_alist(dev, storage->device) {
295 pm_strcpy(device_name, dev->name());
296 bash_spaces(device_name);
297 sd->fsend(use_device, device_name.c_str());
298 Dmsg1(100, ">stored: %s", sd->msg);
300 sd->signal(BNET_EOD); /* end of Devices */
302 sd->signal(BNET_EOD); /* end of Storages */
303 if (bget_dirmsg(sd) > 0) {
304 Dmsg1(100, "<stored: %s", sd->msg);
305 /* ****FIXME**** save actual device name */
306 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
311 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
315 /* Do write side of storage daemon */
317 pm_strcpy(pool_type, jcr->pool->pool_type);
318 pm_strcpy(pool_name, jcr->pool->name());
319 bash_spaces(pool_type);
320 bash_spaces(pool_name);
321 foreach_alist(storage, wstore) {
322 Dmsg1(100, "Wstore=%s\n", storage->name());
323 pm_strcpy(store_name, storage->name());
324 bash_spaces(store_name);
325 pm_strcpy(media_type, storage->media_type);
326 bash_spaces(media_type);
327 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
328 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
330 Dmsg1(100, "wstore >stored: %s", sd->msg);
332 /* Loop over alternative storage Devices until one is OK */
333 foreach_alist(dev, storage->device) {
334 pm_strcpy(device_name, dev->name());
335 bash_spaces(device_name);
336 sd->fsend(use_device, device_name.c_str());
337 Dmsg1(100, ">stored: %s", sd->msg);
339 sd->signal(BNET_EOD); /* end of Devices */
341 sd->signal(BNET_EOD); /* end of Storages */
342 if (bget_dirmsg(sd) > 0) {
343 Dmsg1(100, "<stored: %s", sd->msg);
344 /* ****FIXME**** save actual device name */
345 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
350 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
356 pm_strcpy(err_msg, sd->msg); /* save message */
357 Jmsg(jcr, M_FATAL, 0, _("\n"
358 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
359 device_name.c_str(), err_msg.c_str()/* sd->msg */);
361 Jmsg(jcr, M_FATAL, 0, _("\n"
362 " Storage daemon didn't accept Device \"%s\" command.\n"),
363 device_name.c_str());
370 * Start a thread to handle Storage daemon messages and
373 bool start_storage_daemon_message_thread(JCR *jcr)
378 jcr->inc_use_count(); /* mark in use by msg thread */
379 jcr->sd_msg_thread_done = false;
380 jcr->SD_msg_chan_started = false;
381 Dmsg0(150, "Start SD msg_thread.\n");
382 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
384 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
386 /* Wait for thread to start */
387 while (jcr->SD_msg_chan_started == false) {
389 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
393 Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
397 extern "C" void msg_thread_cleanup(void *arg)
399 JCR *jcr = (JCR *)arg;
400 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
402 jcr->sd_msg_thread_done = true;
403 jcr->SD_msg_chan_started = false;
405 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
406 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
407 db_thread_cleanup(jcr->db); /* remove thread specific data */
408 free_jcr(jcr); /* release jcr */
412 * Handle the message channel (i.e. requests from the
414 * Note, we are running in a separate thread.
416 extern "C" void *msg_thread(void *arg)
418 JCR *jcr = (JCR *)arg;
422 char Job[MAX_NAME_LENGTH];
424 uint32_t JobFiles, JobErrors;
428 pthread_detach(pthread_self());
430 jcr->SD_msg_chan = pthread_self();
431 jcr->SD_msg_chan_started = true;
432 pthread_cleanup_push(msg_thread_cleanup, arg);
433 sd = jcr->store_bsock;
435 /* Read the Storage daemon's output.
437 Dmsg0(100, "Start msg_thread loop\n");
439 while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
440 Dmsg1(400, "<stored: %s", sd->msg);
441 if (sscanf(sd->msg, Job_start, Job) == 1) {
444 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
445 &JobBytes, &JobErrors, ErrMsg) == 6) {
446 jcr->SDJobStatus = JobStatus; /* termination status */
447 jcr->SDJobFiles = JobFiles;
448 jcr->SDJobBytes = JobBytes;
449 jcr->SDErrors = JobErrors;
450 unbash_spaces(ErrMsg); /* Error message if any */
451 pm_strcpy(jcr->StatusErrMsg, ErrMsg);
454 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
456 if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
458 * This probably should be M_FATAL, but I am not 100% sure
459 * that this return *always* corresponds to a dropped line.
461 Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
463 if (jcr->getJobStatus() == JS_Canceled) {
464 jcr->SDJobStatus = JS_Canceled;
465 } else if (sd->is_error()) {
466 jcr->SDJobStatus = JS_ErrorTerminated;
468 pthread_cleanup_pop(1); /* remove and execute the handler */
472 void wait_for_storage_daemon_termination(JCR *jcr)
474 int cancel_count = 0;
475 /* Now wait for Storage daemon to terminate our message thread */
476 while (!jcr->sd_msg_thread_done) {
479 struct timespec timeout;
481 gettimeofday(&tv, &tz);
483 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
484 Dmsg0(400, "I'm waiting for message thread termination.\n");
486 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
488 if (jcr->is_canceled()) {
489 if (jcr->SD_msg_chan_started) {
490 jcr->store_bsock->set_timed_out();
491 jcr->store_bsock->set_terminated();
492 sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
496 /* Give SD 30 seconds to clean up after cancel */
497 if (cancel_count == 6) {
501 jcr->setJobStatus(JS_Terminated);
505 * Send bootstrap file to Storage daemon.
506 * This is used for restore, verify VolumeToCatalog, migration,
509 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
513 const char *bootstrap = "bootstrap\n";
515 Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
516 if (!jcr->RestoreBootstrap) {
519 bs = bfopen(jcr->RestoreBootstrap, "rb");
522 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
523 jcr->RestoreBootstrap, be.bstrerror());
524 jcr->setJobStatus(JS_ErrorTerminated);
527 sd->fsend(bootstrap);
528 while (fgets(buf, sizeof(buf), bs)) {
529 sd->fsend("%s", buf);
531 sd->signal(BNET_EOD);
533 if (jcr->unlink_bsr) {
534 unlink(jcr->RestoreBootstrap);
535 jcr->unlink_bsr = false;
544 extern "C" void *device_thread(void *arg)
551 pthread_detach(pthread_self());
552 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
553 for (i=0; i < MAX_TRIES; i++) {
554 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
555 Dmsg0(900, "Failed connecting to SD.\n");
559 foreach_res(dev, R_DEVICE) {
560 if (!update_device_res(jcr, dev)) {
561 Dmsg1(900, "Error updating device=%s\n", dev->name());
563 Dmsg1(900, "Updated Device=%s\n", dev->name());
567 free_bsock(jcr->store_bsock);
576 * Start a thread to handle getting Device resource information
577 * from SD. This is called once at startup of the Director.
579 void init_device_resources()
584 Dmsg0(100, "Start Device thread.\n");
585 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
587 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));