2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2014 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from many
7 others, a complete list can be found in the file AUTHORS.
9 You may use this file and others of this release according to the
10 license defined in the LICENSE file, which includes the Affero General
11 Public License, v3.0 ("AGPLv3") and some additional permissions and
12 terms pursuant to its AGPLv3 Section 7.
14 Bacula® is a registered trademark of Kern Sibbald.
18 * Bacula Director -- msgchan.c -- handles the message channel
19 * to the Storage daemon and the File daemon.
21 * Written by Kern Sibbald, August MM
23 * This routine runs as a thread and must be thread reentrant.
25 * Basic tasks done here:
26 * Open a message channel with the Storage daemon
27 * to authenticate ourself and to pass the JobId.
28 * Create a thread to interact with the Storage daemon
29 * who returns a job status and requests Catalog services, etc.
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
40 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
42 "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
44 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
45 "pool_type=%s append=%d copy=%d stripe=%d\n";
46 static char use_device[] = "use device=%s\n";
47 //static char query_device[] = _("query device=%s");
49 /* Response from Storage daemon */
50 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
51 static char OK_device[] = "3000 OK use device device=%s\n";
53 /* Storage Daemon requests */
54 static char Job_start[] = "3010 Job %127s start\n";
55 static char Job_end[] =
56 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
58 /* Forward referenced functions */
59 extern "C" void *msg_thread(void *arg);
61 BSOCK *open_sd_bsock(UAContext *ua)
63 STORE *store = ua->jcr->wstore;
65 if (!is_bsock_open(ua->jcr->store_bsock)) {
66 ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
67 store->name(), store->address, store->SDport);
68 if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
69 ua->error_msg(_("Failed to connect to Storage daemon.\n"));
73 return ua->jcr->store_bsock;
76 void close_sd_bsock(UAContext *ua)
78 if (ua->jcr->store_bsock) {
79 ua->jcr->store_bsock->signal(BNET_TERMINATE);
80 free_bsock(ua->jcr->store_bsock);
85 * Establish a message channel connection with the Storage daemon
86 * and perform authentication.
88 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
89 int max_retry_time, int verbose)
91 BSOCK *sd = jcr->store_bsock;
95 if (is_bsock_open(sd)) {
96 return true; /* already connected */
102 /* If there is a write storage use it */
109 if (store->heartbeat_interval) {
110 heart_beat = store->heartbeat_interval;
112 heart_beat = director->heartbeat_interval;
116 * Open message channel with the Storage daemon
118 Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
120 sd->set_source_address(director->DIRsrc_addr);
121 if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
122 store->address, NULL, store->SDport, verbose)) {
129 sd->res = (RES *)store; /* save pointer to other end */
130 jcr->store_bsock = sd;
132 if (!authenticate_storage_daemon(jcr, store)) {
140 * Here we ask the SD to send us the info for a
141 * particular device resource.
144 bool update_device_res(JCR *jcr, DEVICE *dev)
146 POOL_MEM device_name;
148 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
151 sd = jcr->store_bsock;
152 pm_strcpy(device_name, dev->name());
153 bash_spaces(device_name);
154 sd->fsend(query_device, device_name.c_str());
155 Dmsg1(100, ">stored: %s\n", sd->msg);
156 /* The data is returned through Device_update */
157 if (bget_dirmsg(sd) <= 0) {
164 static char OKbootstrap[] = "3000 OK bootstrap\n";
167 * Start a job with the Storage daemon
169 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
174 char sd_auth_key[100];
175 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
176 POOL_MEM job_name, client_name, fileset_name;
179 char ed1[30], ed2[30];
182 sd = jcr->store_bsock;
184 * Now send JobId and permissions, and get back the authorization key.
186 pm_strcpy(job_name, jcr->job->name());
187 bash_spaces(job_name);
188 pm_strcpy(client_name, jcr->client->name());
189 bash_spaces(client_name);
190 pm_strcpy(fileset_name, jcr->fileset->name());
191 bash_spaces(fileset_name);
192 if (jcr->fileset->MD5[0] == 0) {
193 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
195 /* If rescheduling, cancel the previous incarnation of this job
196 * with the SD, which might be waiting on the FD connection.
197 * If we do not cancel it the SD will not accept a new connection
198 * for the same jobid.
200 if (jcr->reschedule_count) {
201 sd->fsend("cancel Job=%s\n", jcr->Job);
202 while (sd->recv() >= 0)
206 sd_client = jcr->sd_client;
207 if (jcr->sd_auth_key) {
208 bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
210 bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
213 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
214 job_name.c_str(), client_name.c_str(),
215 jcr->getJobType(), jcr->getJobLevel(),
216 fileset_name.c_str(), !jcr->pool->catalog_files,
217 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
218 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
219 edit_int64(jcr->spool_size, ed2), jcr->rerunning,
220 jcr->VolSessionId, jcr->VolSessionTime, sd_client,
223 Dmsg1(100, ">stored: %s", sd->msg);
224 Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
225 if (bget_dirmsg(sd) > 0) {
226 Dmsg1(100, "<stored: %s", sd->msg);
227 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
228 &jcr->VolSessionTime, &sd_auth_key) != 3) {
229 Dmsg1(100, "BadJob=%s\n", sd->msg);
230 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
233 bfree_and_null(jcr->sd_auth_key);
234 jcr->sd_auth_key = bstrdup(sd_auth_key);
235 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
238 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
243 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
244 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
249 * We have two loops here. The first comes from the
250 * Storage = associated with the Job, and we need
251 * to attach to each one.
252 * The inner loop loops over all the alternative devices
253 * associated with each Storage. It selects the first
257 /* Do read side of storage daemon */
259 /* For the moment, only migrate, copy and vbackup have rpool */
260 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
261 (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
262 pm_strcpy(pool_type, jcr->rpool->pool_type);
263 pm_strcpy(pool_name, jcr->rpool->name());
265 pm_strcpy(pool_type, jcr->pool->pool_type);
266 pm_strcpy(pool_name, jcr->pool->name());
268 bash_spaces(pool_type);
269 bash_spaces(pool_name);
270 foreach_alist(storage, rstore) {
271 Dmsg1(100, "Rstore=%s\n", storage->name());
272 pm_strcpy(store_name, storage->name());
273 bash_spaces(store_name);
274 if (jcr->media_type) {
275 pm_strcpy(media_type, jcr->media_type); /* user override */
277 pm_strcpy(media_type, storage->media_type);
279 bash_spaces(media_type);
280 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
281 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
282 Dmsg1(100, "rstore >stored: %s", sd->msg);
284 /* Loop over alternative storage Devices until one is OK */
285 foreach_alist(dev, storage->device) {
286 pm_strcpy(device_name, dev->name());
287 bash_spaces(device_name);
288 sd->fsend(use_device, device_name.c_str());
289 Dmsg1(100, ">stored: %s", sd->msg);
291 sd->signal(BNET_EOD); /* end of Devices */
293 sd->signal(BNET_EOD); /* end of Storages */
294 if (bget_dirmsg(sd) > 0) {
295 Dmsg1(100, "<stored: %s", sd->msg);
296 /* ****FIXME**** save actual device name */
297 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
302 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
306 /* Do write side of storage daemon */
308 pm_strcpy(pool_type, jcr->pool->pool_type);
309 pm_strcpy(pool_name, jcr->pool->name());
310 bash_spaces(pool_type);
311 bash_spaces(pool_name);
312 foreach_alist(storage, wstore) {
313 Dmsg1(100, "Wstore=%s\n", storage->name());
314 pm_strcpy(store_name, storage->name());
315 bash_spaces(store_name);
316 pm_strcpy(media_type, storage->media_type);
317 bash_spaces(media_type);
318 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
319 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
321 Dmsg1(100, "wstore >stored: %s", sd->msg);
323 /* Loop over alternative storage Devices until one is OK */
324 foreach_alist(dev, storage->device) {
325 pm_strcpy(device_name, dev->name());
326 bash_spaces(device_name);
327 sd->fsend(use_device, device_name.c_str());
328 Dmsg1(100, ">stored: %s", sd->msg);
330 sd->signal(BNET_EOD); /* end of Devices */
332 sd->signal(BNET_EOD); /* end of Storages */
333 if (bget_dirmsg(sd) > 0) {
334 Dmsg1(100, "<stored: %s", sd->msg);
335 /* ****FIXME**** save actual device name */
336 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
341 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
347 pm_strcpy(err_msg, sd->msg); /* save message */
348 Jmsg(jcr, M_FATAL, 0, _("\n"
349 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
350 device_name.c_str(), err_msg.c_str()/* sd->msg */);
352 Jmsg(jcr, M_FATAL, 0, _("\n"
353 " Storage daemon didn't accept Device \"%s\" command.\n"),
354 device_name.c_str());
361 * Start a thread to handle Storage daemon messages and
364 bool start_storage_daemon_message_thread(JCR *jcr)
369 jcr->inc_use_count(); /* mark in use by msg thread */
370 jcr->sd_msg_thread_done = false;
371 jcr->SD_msg_chan = 0;
372 Dmsg0(150, "Start SD msg_thread.\n");
373 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
375 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
377 /* Wait for thread to start */
378 while (jcr->SD_msg_chan == 0) {
380 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
384 Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
388 extern "C" void msg_thread_cleanup(void *arg)
390 JCR *jcr = (JCR *)arg;
391 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
393 jcr->sd_msg_thread_done = true;
394 jcr->SD_msg_chan = 0;
396 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
397 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
398 db_thread_cleanup(jcr->db); /* remove thread specific data */
399 free_jcr(jcr); /* release jcr */
403 * Handle the message channel (i.e. requests from the
405 * Note, we are running in a separate thread.
407 extern "C" void *msg_thread(void *arg)
409 JCR *jcr = (JCR *)arg;
413 char Job[MAX_NAME_LENGTH];
414 uint32_t JobFiles, JobErrors;
417 pthread_detach(pthread_self());
419 jcr->SD_msg_chan = pthread_self();
420 pthread_cleanup_push(msg_thread_cleanup, arg);
421 sd = jcr->store_bsock;
423 /* Read the Storage daemon's output.
425 Dmsg0(100, "Start msg_thread loop\n");
427 while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
428 Dmsg1(400, "<stored: %s", sd->msg);
429 if (sscanf(sd->msg, Job_start, Job) == 1) {
432 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
433 &JobBytes, &JobErrors) == 5) {
434 jcr->SDJobStatus = JobStatus; /* termination status */
435 jcr->SDJobFiles = JobFiles;
436 jcr->SDJobBytes = JobBytes;
437 jcr->SDErrors = JobErrors;
440 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
442 if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
444 * This probably should be M_FATAL, but I am not 100% sure
445 * that this return *always* corresponds to a dropped line.
447 Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
449 if (jcr->getJobStatus() == JS_Canceled) {
450 jcr->SDJobStatus = JS_Canceled;
451 } else if (sd->is_error()) {
452 jcr->SDJobStatus = JS_ErrorTerminated;
454 pthread_cleanup_pop(1); /* remove and execute the handler */
458 void wait_for_storage_daemon_termination(JCR *jcr)
460 int cancel_count = 0;
461 /* Now wait for Storage daemon to terminate our message thread */
462 while (!jcr->sd_msg_thread_done) {
465 struct timespec timeout;
467 gettimeofday(&tv, &tz);
469 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
470 Dmsg0(400, "I'm waiting for message thread termination.\n");
472 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
474 if (jcr->is_canceled()) {
475 if (jcr->SD_msg_chan) {
476 jcr->store_bsock->set_timed_out();
477 jcr->store_bsock->set_terminated();
478 sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
482 /* Give SD 30 seconds to clean up after cancel */
483 if (cancel_count == 6) {
487 jcr->setJobStatus(JS_Terminated);
491 * Send bootstrap file to Storage daemon.
492 * This is used for restore, verify VolumeToCatalog, migration,
495 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
499 const char *bootstrap = "bootstrap\n";
501 Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
502 if (!jcr->RestoreBootstrap) {
505 bs = fopen(jcr->RestoreBootstrap, "rb");
508 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
509 jcr->RestoreBootstrap, be.bstrerror());
510 jcr->setJobStatus(JS_ErrorTerminated);
513 sd->fsend(bootstrap);
514 while (fgets(buf, sizeof(buf), bs)) {
515 sd->fsend("%s", buf);
517 sd->signal(BNET_EOD);
519 if (jcr->unlink_bsr) {
520 unlink(jcr->RestoreBootstrap);
521 jcr->unlink_bsr = false;
530 extern "C" void *device_thread(void *arg)
537 pthread_detach(pthread_self());
538 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
539 for (i=0; i < MAX_TRIES; i++) {
540 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
541 Dmsg0(900, "Failed connecting to SD.\n");
545 foreach_res(dev, R_DEVICE) {
546 if (!update_device_res(jcr, dev)) {
547 Dmsg1(900, "Error updating device=%s\n", dev->name());
549 Dmsg1(900, "Updated Device=%s\n", dev->name());
553 free_bsock(jcr->store_bsock);
562 * Start a thread to handle getting Device resource information
563 * from SD. This is called once at startup of the Director.
565 void init_device_resources()
570 Dmsg0(100, "Start Device thread.\n");
571 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
573 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));