2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version three of the GNU Affero General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU Affero General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
30 * Bacula Director -- msgchan.c -- handles the message channel
31 * to the Storage daemon and the File daemon.
33 * Kern Sibbald, August MM
35 * This routine runs as a thread and must be thread reentrant.
37 * Basic tasks done here:
38 * Open a message channel with the Storage daemon
39 * to authenticate ourself and to pass the JobId.
40 * Create a thread to interact with the Storage daemon
41 * who returns a job status and requests Catalog services, etc.
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 /* Commands sent to Storage daemon */
51 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
52 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
53 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
54 "rerunning=%d VolSessionId=%d VolSessionTime=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
79 BSOCK *sd = new_bsock();
83 if (jcr->store_bsock) {
84 return true; /* already connected */
87 /* If there is a write storage use it */
94 if (store->heartbeat_interval) {
95 heart_beat = store->heartbeat_interval;
97 heart_beat = director->heartbeat_interval;
101 * Open message channel with the Storage daemon
103 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
105 sd->set_source_address(director->DIRsrc_addr);
106 if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
107 store->address, NULL, store->SDport, verbose)) {
115 sd->res = (RES *)store; /* save pointer to other end */
116 jcr->store_bsock = sd;
118 if (!authenticate_storage_daemon(jcr, store)) {
120 jcr->store_bsock = NULL;
127 * Here we ask the SD to send us the info for a
128 * particular device resource.
131 bool update_device_res(JCR *jcr, DEVICE *dev)
133 POOL_MEM device_name;
135 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
138 sd = jcr->store_bsock;
139 pm_strcpy(device_name, dev->name());
140 bash_spaces(device_name);
141 sd->fsend(query_device, device_name.c_str());
142 Dmsg1(100, ">stored: %s\n", sd->msg);
143 /* The data is returned through Device_update */
144 if (bget_dirmsg(sd) <= 0) {
151 static char OKbootstrap[] = "3000 OK bootstrap\n";
154 * Start a job with the Storage daemon
156 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
162 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
163 POOL_MEM job_name, client_name, fileset_name;
166 char ed1[30], ed2[30];
168 sd = jcr->store_bsock;
170 * Now send JobId and permissions, and get back the authorization key.
172 pm_strcpy(job_name, jcr->job->name());
173 bash_spaces(job_name);
174 pm_strcpy(client_name, jcr->client->name());
175 bash_spaces(client_name);
176 pm_strcpy(fileset_name, jcr->fileset->name());
177 bash_spaces(fileset_name);
178 if (jcr->fileset->MD5[0] == 0) {
179 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
181 /* If rescheduling, cancel the previous incarnation of this job
182 * with the SD, which might be waiting on the FD connection.
183 * If we do not cancel it the SD will not accept a new connection
184 * for the same jobid.
186 if (jcr->reschedule_count) {
187 sd->fsend("cancel Job=%s\n", jcr->Job);
188 while (sd->recv() >= 0)
191 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
192 job_name.c_str(), client_name.c_str(),
193 jcr->getJobType(), jcr->getJobLevel(),
194 fileset_name.c_str(), !jcr->pool->catalog_files,
195 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
196 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
197 edit_int64(jcr->spool_size, ed2), jcr->rerunning,
198 jcr->VolSessionId, jcr->VolSessionTime);
199 Dmsg1(100, ">stored: %s", sd->msg);
200 if (bget_dirmsg(sd) > 0) {
201 Dmsg1(100, "<stored: %s", sd->msg);
202 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
203 &jcr->VolSessionTime, &auth_key) != 3) {
204 Dmsg1(100, "BadJob=%s\n", sd->msg);
205 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
208 bfree_and_null(jcr->sd_auth_key);
209 jcr->sd_auth_key = bstrdup(auth_key);
210 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
213 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
218 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
219 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
224 * We have two loops here. The first comes from the
225 * Storage = associated with the Job, and we need
226 * to attach to each one.
227 * The inner loop loops over all the alternative devices
228 * associated with each Storage. It selects the first
232 /* Do read side of storage daemon */
234 /* For the moment, only migrate, copy and vbackup have rpool */
235 if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
236 (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
237 pm_strcpy(pool_type, jcr->rpool->pool_type);
238 pm_strcpy(pool_name, jcr->rpool->name());
240 pm_strcpy(pool_type, jcr->pool->pool_type);
241 pm_strcpy(pool_name, jcr->pool->name());
243 bash_spaces(pool_type);
244 bash_spaces(pool_name);
245 foreach_alist(storage, rstore) {
246 Dmsg1(100, "Rstore=%s\n", storage->name());
247 pm_strcpy(store_name, storage->name());
248 bash_spaces(store_name);
249 pm_strcpy(media_type, storage->media_type);
250 bash_spaces(media_type);
251 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
252 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
253 Dmsg1(100, "rstore >stored: %s", sd->msg);
255 /* Loop over alternative storage Devices until one is OK */
256 foreach_alist(dev, storage->device) {
257 pm_strcpy(device_name, dev->name());
258 bash_spaces(device_name);
259 sd->fsend(use_device, device_name.c_str());
260 Dmsg1(100, ">stored: %s", sd->msg);
262 sd->signal(BNET_EOD); /* end of Devices */
264 sd->signal(BNET_EOD); /* end of Storages */
265 if (bget_dirmsg(sd) > 0) {
266 Dmsg1(100, "<stored: %s", sd->msg);
267 /* ****FIXME**** save actual device name */
268 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
274 /* Do write side of storage daemon */
276 pm_strcpy(pool_type, jcr->pool->pool_type);
277 pm_strcpy(pool_name, jcr->pool->name());
278 bash_spaces(pool_type);
279 bash_spaces(pool_name);
280 foreach_alist(storage, wstore) {
281 pm_strcpy(store_name, storage->name());
282 bash_spaces(store_name);
283 pm_strcpy(media_type, storage->media_type);
284 bash_spaces(media_type);
285 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
286 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
288 Dmsg1(100, "wstore >stored: %s", sd->msg);
290 /* Loop over alternative storage Devices until one is OK */
291 foreach_alist(dev, storage->device) {
292 pm_strcpy(device_name, dev->name());
293 bash_spaces(device_name);
294 sd->fsend(use_device, device_name.c_str());
295 Dmsg1(100, ">stored: %s", sd->msg);
297 sd->signal(BNET_EOD); /* end of Devices */
299 sd->signal(BNET_EOD); /* end of Storages */
300 if (bget_dirmsg(sd) > 0) {
301 Dmsg1(100, "<stored: %s", sd->msg);
302 /* ****FIXME**** save actual device name */
303 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
311 pm_strcpy(err_msg, sd->msg); /* save message */
312 Jmsg(jcr, M_FATAL, 0, _("\n"
313 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
314 device_name.c_str(), err_msg.c_str()/* sd->msg */);
316 Jmsg(jcr, M_FATAL, 0, _("\n"
317 " Storage daemon didn't accept Device \"%s\" command.\n"),
318 device_name.c_str());
321 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
327 * Start a thread to handle Storage daemon messages and
330 bool start_storage_daemon_message_thread(JCR *jcr)
335 jcr->inc_use_count(); /* mark in use by msg thread */
336 jcr->sd_msg_thread_done = false;
337 jcr->SD_msg_chan = 0;
338 Dmsg0(100, "Start SD msg_thread.\n");
339 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
341 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
343 /* Wait for thread to start */
344 while (jcr->SD_msg_chan == 0) {
346 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
350 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
354 extern "C" void msg_thread_cleanup(void *arg)
356 JCR *jcr = (JCR *)arg;
357 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
359 jcr->sd_msg_thread_done = true;
360 jcr->SD_msg_chan = 0;
362 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
363 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
364 db_thread_cleanup(jcr->db); /* remove thread specific data */
365 free_jcr(jcr); /* release jcr */
369 * Handle the message channel (i.e. requests from the
371 * Note, we are running in a separate thread.
373 extern "C" void *msg_thread(void *arg)
375 JCR *jcr = (JCR *)arg;
379 char Job[MAX_NAME_LENGTH];
380 uint32_t JobFiles, JobErrors;
383 pthread_detach(pthread_self());
385 jcr->SD_msg_chan = pthread_self();
386 pthread_cleanup_push(msg_thread_cleanup, arg);
387 sd = jcr->store_bsock;
389 /* Read the Storage daemon's output.
391 Dmsg0(100, "Start msg_thread loop\n");
393 while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
394 Dmsg1(400, "<stored: %s", sd->msg);
395 if (sscanf(sd->msg, Job_start, Job) == 1) {
398 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
399 &JobBytes, &JobErrors) == 5) {
400 jcr->SDJobStatus = JobStatus; /* termination status */
401 jcr->SDJobFiles = JobFiles;
402 jcr->SDJobBytes = JobBytes;
403 jcr->SDErrors = JobErrors;
406 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
408 if (n == BNET_HARDEOF) {
410 * This probably should be M_FATAL, but I am not 100% sure
411 * that this return *always* corresponds to a dropped line.
413 Qmsg(jcr, M_ERROR, 0, _("Director's comm line to SD dropped.\n"));
415 if (is_bnet_error(sd)) {
416 jcr->SDJobStatus = JS_ErrorTerminated;
418 pthread_cleanup_pop(1); /* remove and execute the handler */
422 void wait_for_storage_daemon_termination(JCR *jcr)
424 int cancel_count = 0;
425 /* Now wait for Storage daemon to terminate our message thread */
426 while (!jcr->sd_msg_thread_done) {
429 struct timespec timeout;
431 gettimeofday(&tv, &tz);
433 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
434 Dmsg0(400, "I'm waiting for message thread termination.\n");
436 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
438 if (jcr->is_canceled()) {
439 if (jcr->SD_msg_chan) {
440 jcr->store_bsock->set_timed_out();
441 jcr->store_bsock->set_terminated();
442 sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
446 /* Give SD 30 seconds to clean up after cancel */
447 if (cancel_count == 6) {
451 jcr->setJobStatus(JS_Terminated);
455 * Send bootstrap file to Storage daemon.
456 * This is used for restore, verify VolumeToCatalog, migration,
459 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
463 const char *bootstrap = "bootstrap\n";
465 Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
466 if (!jcr->RestoreBootstrap) {
469 bs = fopen(jcr->RestoreBootstrap, "rb");
472 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
473 jcr->RestoreBootstrap, be.bstrerror());
474 jcr->setJobStatus(JS_ErrorTerminated);
477 sd->fsend(bootstrap);
478 while (fgets(buf, sizeof(buf), bs)) {
479 sd->fsend("%s", buf);
481 sd->signal(BNET_EOD);
483 if (jcr->unlink_bsr) {
484 unlink(jcr->RestoreBootstrap);
485 jcr->unlink_bsr = false;
494 extern "C" void *device_thread(void *arg)
501 pthread_detach(pthread_self());
502 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
503 for (i=0; i < MAX_TRIES; i++) {
504 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
505 Dmsg0(900, "Failed connecting to SD.\n");
509 foreach_res(dev, R_DEVICE) {
510 if (!update_device_res(jcr, dev)) {
511 Dmsg1(900, "Error updating device=%s\n", dev->name());
513 Dmsg1(900, "Updated Device=%s\n", dev->name());
517 bnet_close(jcr->store_bsock);
518 jcr->store_bsock = NULL;
527 * Start a thread to handle getting Device resource information
528 * from SD. This is called once at startup of the Director.
530 void init_device_resources()
535 Dmsg0(100, "Start Device thread.\n");
536 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
538 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));