2 Bacula(R) - The Network Backup Solution
4 Copyright (C) 2000-2015 Kern Sibbald
6 The original author of Bacula is Kern Sibbald, with contributions
7 from many others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 This notice must be preserved when any source code is
15 conveyed and/or propagated.
17 Bacula(R) is a registered trademark of Kern Sibbald.
21 * Bacula Director -- msgchan.c -- handles the message channel
22 * to the Storage daemon and the File daemon.
24 * Written by Kern Sibbald, August MM
26 * This routine runs as a thread and must be thread reentrant.
28 * Basic tasks done here:
29 * Open a message channel with the Storage daemon
30 * to authenticate ourself and to pass the JobId.
31 * Create a thread to interact with the Storage daemon
32 * who returns a job status and requests Catalog services, etc.
39 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
41 /* Commands sent to Storage daemon */
42 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
43 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
44 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
45 "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
47 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
48 "pool_type=%s append=%d copy=%d stripe=%d\n";
49 static char use_device[] = "use device=%s\n";
50 //static char query_device[] = _("query device=%s");
52 /* Response from Storage daemon */
53 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
54 static char OK_device[] = "3000 OK use device device=%s\n";
56 /* Storage Daemon requests */
57 static char Job_start[] = "3010 Job %127s start\n";
58 static char Job_end[] =
59 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
61 /* Forward referenced functions */
62 extern "C" void *msg_thread(void *arg);
64 BSOCK *open_sd_bsock(UAContext *ua)
66 STORE *store = ua->jcr->wstore;
68 if (!is_bsock_open(ua->jcr->store_bsock)) {
69 ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
70 store->name(), store->address, store->SDport);
71 if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
72 ua->error_msg(_("Failed to connect to Storage daemon.\n"));
76 return ua->jcr->store_bsock;
79 void close_sd_bsock(UAContext *ua)
81 if (ua->jcr->store_bsock) {
82 ua->jcr->store_bsock->signal(BNET_TERMINATE);
83 free_bsock(ua->jcr->store_bsock);
88 * Establish a message channel connection with the Storage daemon
89 * and perform authentication.
91 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
92 int max_retry_time, int verbose)
94 BSOCK *sd = jcr->store_bsock;
98 if (is_bsock_open(sd)) {
99 return true; /* already connected */
105 /* If there is a write storage use it */
112 if (store->heartbeat_interval) {
113 heart_beat = store->heartbeat_interval;
115 heart_beat = director->heartbeat_interval;
119 * Open message channel with the Storage daemon
121 Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
123 sd->set_source_address(director->DIRsrc_addr);
124 if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
125 store->address, NULL, store->SDport, verbose)) {
127 if (!jcr->store_bsock) { /* The bsock was locally created, so we free it here */
136 sd->res = (RES *)store; /* save pointer to other end */
137 jcr->store_bsock = sd;
139 if (!authenticate_storage_daemon(jcr, store)) {
147 * Here we ask the SD to send us the info for a
148 * particular device resource.
151 bool update_device_res(JCR *jcr, DEVICE *dev)
153 POOL_MEM device_name;
155 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
158 sd = jcr->store_bsock;
159 pm_strcpy(device_name, dev->name());
160 bash_spaces(device_name);
161 sd->fsend(query_device, device_name.c_str());
162 Dmsg1(100, ">stored: %s\n", sd->msg);
163 /* The data is returned through Device_update */
164 if (bget_dirmsg(sd) <= 0) {
171 static char OKbootstrap[] = "3000 OK bootstrap\n";
174 * Start a job with the Storage daemon
176 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
181 char sd_auth_key[100];
182 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
183 POOL_MEM job_name, client_name, fileset_name;
186 char ed1[30], ed2[30];
189 sd = jcr->store_bsock;
191 * Now send JobId and permissions, and get back the authorization key.
193 pm_strcpy(job_name, jcr->job->name());
194 bash_spaces(job_name);
196 pm_strcpy(client_name, jcr->client->name());
198 pm_strcpy(client_name, "**Dummy**");
200 bash_spaces(client_name);
201 pm_strcpy(fileset_name, jcr->fileset->name());
202 bash_spaces(fileset_name);
203 if (jcr->fileset->MD5[0] == 0) {
204 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
206 /* If rescheduling, cancel the previous incarnation of this job
207 * with the SD, which might be waiting on the FD connection.
208 * If we do not cancel it the SD will not accept a new connection
209 * for the same jobid.
211 if (jcr->reschedule_count) {
212 sd->fsend("cancel Job=%s\n", jcr->Job);
213 while (sd->recv() >= 0)
217 sd_client = jcr->sd_client;
218 if (jcr->sd_auth_key) {
219 bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
221 bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
224 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
225 job_name.c_str(), client_name.c_str(),
226 jcr->getJobType(), jcr->getJobLevel(),
227 fileset_name.c_str(), !jcr->pool->catalog_files,
228 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
229 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
230 edit_int64(jcr->spool_size, ed2), jcr->rerunning,
231 jcr->VolSessionId, jcr->VolSessionTime, sd_client,
234 Dmsg1(100, ">stored: %s", sd->msg);
235 Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
236 if (bget_dirmsg(sd) > 0) {
237 Dmsg1(100, "<stored: %s", sd->msg);
238 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
239 &jcr->VolSessionTime, &sd_auth_key) != 3) {
240 Dmsg1(100, "BadJob=%s\n", sd->msg);
241 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
244 bfree_and_null(jcr->sd_auth_key);
245 jcr->sd_auth_key = bstrdup(sd_auth_key);
246 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
249 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
254 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
255 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
260 * We have two loops here. The first comes from the
261 * Storage = associated with the Job, and we need
262 * to attach to each one.
263 * The inner loop loops over all the alternative devices
264 * associated with each Storage. It selects the first
268 /* Do read side of storage daemon */
270 /* For the moment, only migrate, copy and vbackup have rpool */
271 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
272 (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
273 pm_strcpy(pool_type, jcr->rpool->pool_type);
274 pm_strcpy(pool_name, jcr->rpool->name());
276 pm_strcpy(pool_type, jcr->pool->pool_type);
277 pm_strcpy(pool_name, jcr->pool->name());
279 bash_spaces(pool_type);
280 bash_spaces(pool_name);
281 foreach_alist(storage, rstore) {
282 Dmsg1(100, "Rstore=%s\n", storage->name());
283 pm_strcpy(store_name, storage->name());
284 bash_spaces(store_name);
285 if (jcr->media_type) {
286 pm_strcpy(media_type, jcr->media_type); /* user override */
288 pm_strcpy(media_type, storage->media_type);
290 bash_spaces(media_type);
291 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
292 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
293 Dmsg1(100, "rstore >stored: %s", sd->msg);
295 /* Loop over alternative storage Devices until one is OK */
296 foreach_alist(dev, storage->device) {
297 pm_strcpy(device_name, dev->name());
298 bash_spaces(device_name);
299 sd->fsend(use_device, device_name.c_str());
300 Dmsg1(100, ">stored: %s", sd->msg);
302 sd->signal(BNET_EOD); /* end of Devices */
304 sd->signal(BNET_EOD); /* end of Storages */
305 if (bget_dirmsg(sd) > 0) {
306 Dmsg1(100, "<stored: %s", sd->msg);
307 /* ****FIXME**** save actual device name */
308 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
313 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
317 /* Do write side of storage daemon */
319 pm_strcpy(pool_type, jcr->pool->pool_type);
320 pm_strcpy(pool_name, jcr->pool->name());
321 bash_spaces(pool_type);
322 bash_spaces(pool_name);
323 foreach_alist(storage, wstore) {
324 Dmsg1(100, "Wstore=%s\n", storage->name());
325 pm_strcpy(store_name, storage->name());
326 bash_spaces(store_name);
327 pm_strcpy(media_type, storage->media_type);
328 bash_spaces(media_type);
329 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
330 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
332 Dmsg1(100, "wstore >stored: %s", sd->msg);
334 /* Loop over alternative storage Devices until one is OK */
335 foreach_alist(dev, storage->device) {
336 pm_strcpy(device_name, dev->name());
337 bash_spaces(device_name);
338 sd->fsend(use_device, device_name.c_str());
339 Dmsg1(100, ">stored: %s", sd->msg);
341 sd->signal(BNET_EOD); /* end of Devices */
343 sd->signal(BNET_EOD); /* end of Storages */
344 if (bget_dirmsg(sd) > 0) {
345 Dmsg1(100, "<stored: %s", sd->msg);
346 /* ****FIXME**** save actual device name */
347 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
352 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
358 pm_strcpy(err_msg, sd->msg); /* save message */
359 Jmsg(jcr, M_FATAL, 0, _("\n"
360 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
361 device_name.c_str(), err_msg.c_str()/* sd->msg */);
363 Jmsg(jcr, M_FATAL, 0, _("\n"
364 " Storage daemon didn't accept Device \"%s\" command.\n"),
365 device_name.c_str());
372 * Start a thread to handle Storage daemon messages and
375 bool start_storage_daemon_message_thread(JCR *jcr)
380 jcr->inc_use_count(); /* mark in use by msg thread */
381 jcr->sd_msg_thread_done = false;
382 jcr->SD_msg_chan_started = false;
383 Dmsg0(150, "Start SD msg_thread.\n");
384 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
386 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
388 /* Wait for thread to start */
389 while (jcr->SD_msg_chan_started == false) {
391 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
395 Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
399 extern "C" void msg_thread_cleanup(void *arg)
401 JCR *jcr = (JCR *)arg;
402 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
404 jcr->sd_msg_thread_done = true;
405 jcr->SD_msg_chan_started = false;
407 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
408 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
409 db_thread_cleanup(jcr->db); /* remove thread specific data */
410 free_jcr(jcr); /* release jcr */
414 * Handle the message channel (i.e. requests from the
416 * Note, we are running in a separate thread.
418 extern "C" void *msg_thread(void *arg)
420 JCR *jcr = (JCR *)arg;
424 char Job[MAX_NAME_LENGTH];
425 uint32_t JobFiles, JobErrors;
428 pthread_detach(pthread_self());
430 jcr->SD_msg_chan = pthread_self();
431 jcr->SD_msg_chan_started = true;
432 pthread_cleanup_push(msg_thread_cleanup, arg);
433 sd = jcr->store_bsock;
435 /* Read the Storage daemon's output.
437 Dmsg0(100, "Start msg_thread loop\n");
439 while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
440 Dmsg1(400, "<stored: %s", sd->msg);
441 if (sscanf(sd->msg, Job_start, Job) == 1) {
444 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
445 &JobBytes, &JobErrors) == 5) {
446 jcr->SDJobStatus = JobStatus; /* termination status */
447 jcr->SDJobFiles = JobFiles;
448 jcr->SDJobBytes = JobBytes;
449 jcr->SDErrors = JobErrors;
452 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
454 if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
456 * This probably should be M_FATAL, but I am not 100% sure
457 * that this return *always* corresponds to a dropped line.
459 Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
461 if (jcr->getJobStatus() == JS_Canceled) {
462 jcr->SDJobStatus = JS_Canceled;
463 } else if (sd->is_error()) {
464 jcr->SDJobStatus = JS_ErrorTerminated;
466 pthread_cleanup_pop(1); /* remove and execute the handler */
470 void wait_for_storage_daemon_termination(JCR *jcr)
472 int cancel_count = 0;
473 /* Now wait for Storage daemon to terminate our message thread */
474 while (!jcr->sd_msg_thread_done) {
477 struct timespec timeout;
479 gettimeofday(&tv, &tz);
481 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
482 Dmsg0(400, "I'm waiting for message thread termination.\n");
484 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
486 if (jcr->is_canceled()) {
487 if (jcr->SD_msg_chan_started) {
488 jcr->store_bsock->set_timed_out();
489 jcr->store_bsock->set_terminated();
490 sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
494 /* Give SD 30 seconds to clean up after cancel */
495 if (cancel_count == 6) {
499 jcr->setJobStatus(JS_Terminated);
503 * Send bootstrap file to Storage daemon.
504 * This is used for restore, verify VolumeToCatalog, migration,
507 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
511 const char *bootstrap = "bootstrap\n";
513 Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
514 if (!jcr->RestoreBootstrap) {
517 bs = fopen(jcr->RestoreBootstrap, "rb");
520 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
521 jcr->RestoreBootstrap, be.bstrerror());
522 jcr->setJobStatus(JS_ErrorTerminated);
525 sd->fsend(bootstrap);
526 while (fgets(buf, sizeof(buf), bs)) {
527 sd->fsend("%s", buf);
529 sd->signal(BNET_EOD);
531 if (jcr->unlink_bsr) {
532 unlink(jcr->RestoreBootstrap);
533 jcr->unlink_bsr = false;
542 extern "C" void *device_thread(void *arg)
549 pthread_detach(pthread_self());
550 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
551 for (i=0; i < MAX_TRIES; i++) {
552 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
553 Dmsg0(900, "Failed connecting to SD.\n");
557 foreach_res(dev, R_DEVICE) {
558 if (!update_device_res(jcr, dev)) {
559 Dmsg1(900, "Error updating device=%s\n", dev->name());
561 Dmsg1(900, "Updated Device=%s\n", dev->name());
565 free_bsock(jcr->store_bsock);
574 * Start a thread to handle getting Device resource information
575 * from SD. This is called once at startup of the Director.
577 void init_device_resources()
582 Dmsg0(100, "Start Device thread.\n");
583 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
585 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));