2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
30 * Bacula Director -- msgchan.c -- handles the message channel
31 * to the Storage daemon and the File daemon.
33 * Kern Sibbald, August MM
35 * This routine runs as a thread and must be thread reentrant.
37 * Basic tasks done here:
38 * Open a message channel with the Storage daemon
39 * to authenticate ourself and to pass the JobId.
40 * Create a thread to interact with the Storage daemon
41 * who returns a job status and requests Catalog services, etc.
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50 /* Commands sent to Storage daemon */
51 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
52 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
53 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
79 BSOCK *sd = new_bsock();
83 if (jcr->store_bsock) {
84 return true; /* already connected */
87 /* If there is a write storage use it */
94 if (store->heartbeat_interval) {
95 heart_beat = store->heartbeat_interval;
97 heart_beat = director->heartbeat_interval;
101 * Open message channel with the Storage daemon
103 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
105 sd->set_source_address(director->DIRsrc_addr);
106 if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
107 store->address, NULL, store->SDport, verbose)) {
115 sd->res = (RES *)store; /* save pointer to other end */
116 jcr->store_bsock = sd;
118 if (!authenticate_storage_daemon(jcr, store)) {
120 jcr->store_bsock = NULL;
127 * Here we ask the SD to send us the info for a
128 * particular device resource.
131 bool update_device_res(JCR *jcr, DEVICE *dev)
133 POOL_MEM device_name;
135 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
138 sd = jcr->store_bsock;
139 pm_strcpy(device_name, dev->name());
140 bash_spaces(device_name);
141 sd->fsend(query_device, device_name.c_str());
142 Dmsg1(100, ">stored: %s\n", sd->msg);
143 /* The data is returned through Device_update */
144 if (bget_dirmsg(sd) <= 0) {
151 static char OKbootstrap[] = "3000 OK bootstrap\n";
154 * Start a job with the Storage daemon
156 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
162 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
163 POOL_MEM job_name, client_name, fileset_name;
166 char ed1[30], ed2[30];
168 sd = jcr->store_bsock;
170 * Now send JobId and permissions, and get back the authorization key.
172 pm_strcpy(job_name, jcr->job->name());
173 bash_spaces(job_name);
174 pm_strcpy(client_name, jcr->client->name());
175 bash_spaces(client_name);
176 pm_strcpy(fileset_name, jcr->fileset->name());
177 bash_spaces(fileset_name);
178 if (jcr->fileset->MD5[0] == 0) {
179 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
181 /* If rescheduling, cancel the previous incarnation of this job
182 * with the SD, which might be waiting on the FD connection.
183 * If we do not cancel it the SD will not accept a new connection
184 * for the same jobid.
186 if (jcr->reschedule_count) {
187 sd->fsend("cancel Job=%s\n", jcr->Job);
188 while (sd->recv() >= 0)
191 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
192 job_name.c_str(), client_name.c_str(),
193 jcr->get_JobType(), jcr->get_JobLevel(),
194 fileset_name.c_str(), !jcr->pool->catalog_files,
195 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
196 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
197 edit_int64(jcr->spool_size, ed2));
198 Dmsg1(100, ">stored: %s", sd->msg);
199 if (bget_dirmsg(sd) > 0) {
200 Dmsg1(100, "<stored: %s", sd->msg);
201 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
202 &jcr->VolSessionTime, &auth_key) != 3) {
203 Dmsg1(100, "BadJob=%s\n", sd->msg);
204 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
207 bfree_and_null(jcr->sd_auth_key);
208 jcr->sd_auth_key = bstrdup(auth_key);
209 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
212 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
217 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
218 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
223 * We have two loops here. The first comes from the
224 * Storage = associated with the Job, and we need
225 * to attach to each one.
226 * The inner loop loops over all the alternative devices
227 * associated with each Storage. It selects the first
231 /* Do read side of storage daemon */
233 /* For the moment, only migrate, copy and vbackup have rpool */
234 if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
235 (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
236 pm_strcpy(pool_type, jcr->rpool->pool_type);
237 pm_strcpy(pool_name, jcr->rpool->name());
239 pm_strcpy(pool_type, jcr->pool->pool_type);
240 pm_strcpy(pool_name, jcr->pool->name());
242 bash_spaces(pool_type);
243 bash_spaces(pool_name);
244 foreach_alist(storage, rstore) {
245 Dmsg1(100, "Rstore=%s\n", storage->name());
246 pm_strcpy(store_name, storage->name());
247 bash_spaces(store_name);
248 pm_strcpy(media_type, storage->media_type);
249 bash_spaces(media_type);
250 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
251 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
252 Dmsg1(100, "rstore >stored: %s", sd->msg);
254 /* Loop over alternative storage Devices until one is OK */
255 foreach_alist(dev, storage->device) {
256 pm_strcpy(device_name, dev->name());
257 bash_spaces(device_name);
258 sd->fsend(use_device, device_name.c_str());
259 Dmsg1(100, ">stored: %s", sd->msg);
261 sd->signal(BNET_EOD); /* end of Devices */
263 sd->signal(BNET_EOD); /* end of Storages */
264 if (bget_dirmsg(sd) > 0) {
265 Dmsg1(100, "<stored: %s", sd->msg);
266 /* ****FIXME**** save actual device name */
267 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
273 /* Do write side of storage daemon */
275 pm_strcpy(pool_type, jcr->pool->pool_type);
276 pm_strcpy(pool_name, jcr->pool->name());
277 bash_spaces(pool_type);
278 bash_spaces(pool_name);
279 foreach_alist(storage, wstore) {
280 pm_strcpy(store_name, storage->name());
281 bash_spaces(store_name);
282 pm_strcpy(media_type, storage->media_type);
283 bash_spaces(media_type);
284 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
285 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
287 Dmsg1(100, "wstore >stored: %s", sd->msg);
289 /* Loop over alternative storage Devices until one is OK */
290 foreach_alist(dev, storage->device) {
291 pm_strcpy(device_name, dev->name());
292 bash_spaces(device_name);
293 sd->fsend(use_device, device_name.c_str());
294 Dmsg1(100, ">stored: %s", sd->msg);
296 sd->signal(BNET_EOD); /* end of Devices */
298 sd->signal(BNET_EOD); /* end of Storages */
299 if (bget_dirmsg(sd) > 0) {
300 Dmsg1(100, "<stored: %s", sd->msg);
301 /* ****FIXME**** save actual device name */
302 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
310 pm_strcpy(err_msg, sd->msg); /* save message */
311 Jmsg(jcr, M_FATAL, 0, _("\n"
312 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
313 device_name.c_str(), err_msg.c_str()/* sd->msg */);
315 Jmsg(jcr, M_FATAL, 0, _("\n"
316 " Storage daemon didn't accept Device \"%s\" command.\n"),
317 device_name.c_str());
320 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
326 * Start a thread to handle Storage daemon messages and
329 bool start_storage_daemon_message_thread(JCR *jcr)
334 jcr->inc_use_count(); /* mark in use by msg thread */
335 jcr->sd_msg_thread_done = false;
336 jcr->SD_msg_chan = 0;
337 Dmsg0(100, "Start SD msg_thread.\n");
338 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
340 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
342 /* Wait for thread to start */
343 while (jcr->SD_msg_chan == 0) {
345 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
349 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
353 extern "C" void msg_thread_cleanup(void *arg)
355 JCR *jcr = (JCR *)arg;
356 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
357 jcr->sd_msg_thread_done = true;
358 jcr->SD_msg_chan = 0;
359 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
360 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
361 free_jcr(jcr); /* release jcr */
362 db_thread_cleanup(); /* remove thread specific data */
366 * Handle the message channel (i.e. requests from the
368 * Note, we are running in a separate thread.
370 extern "C" void *msg_thread(void *arg)
372 JCR *jcr = (JCR *)arg;
375 char Job[MAX_NAME_LENGTH];
376 uint32_t JobFiles, JobErrors;
379 pthread_detach(pthread_self());
381 jcr->SD_msg_chan = pthread_self();
382 pthread_cleanup_push(msg_thread_cleanup, arg);
383 sd = jcr->store_bsock;
385 /* Read the Storage daemon's output.
387 Dmsg0(100, "Start msg_thread loop\n");
388 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
389 Dmsg1(400, "<stored: %s", sd->msg);
390 if (sscanf(sd->msg, Job_start, Job) == 1) {
393 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
394 &JobBytes, &JobErrors) == 5) {
395 jcr->SDJobStatus = JobStatus; /* termination status */
396 jcr->SDJobFiles = JobFiles;
397 jcr->SDJobBytes = JobBytes;
398 jcr->SDErrors = JobErrors;
401 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
403 if (is_bnet_error(sd)) {
404 jcr->SDJobStatus = JS_ErrorTerminated;
406 pthread_cleanup_pop(1); /* remove and execute the handler */
410 void wait_for_storage_daemon_termination(JCR *jcr)
412 int cancel_count = 0;
413 /* Now wait for Storage daemon to terminate our message thread */
414 while (!jcr->sd_msg_thread_done) {
417 struct timespec timeout;
419 gettimeofday(&tv, &tz);
421 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
422 Dmsg0(400, "I'm waiting for message thread termination.\n");
424 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
426 if (job_canceled(jcr)) {
427 if (jcr->SD_msg_chan) {
428 jcr->store_bsock->set_timed_out();
429 jcr->store_bsock->set_terminated();
430 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
431 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
435 /* Give SD 30 seconds to clean up after cancel */
436 if (cancel_count == 6) {
440 set_jcr_job_status(jcr, JS_Terminated);
444 * Send bootstrap file to Storage daemon.
445 * This is used for restore, verify VolumeToCatalog, migration,
448 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
452 const char *bootstrap = "bootstrap\n";
454 Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
455 if (!jcr->RestoreBootstrap) {
458 bs = fopen(jcr->RestoreBootstrap, "rb");
461 Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
462 jcr->RestoreBootstrap, be.bstrerror());
463 set_jcr_job_status(jcr, JS_ErrorTerminated);
466 sd->fsend(bootstrap);
467 while (fgets(buf, sizeof(buf), bs)) {
468 sd->fsend("%s", buf);
470 sd->signal(BNET_EOD);
472 if (jcr->unlink_bsr) {
473 unlink(jcr->RestoreBootstrap);
474 jcr->unlink_bsr = false;
483 extern "C" void *device_thread(void *arg)
490 pthread_detach(pthread_self());
491 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
492 for (i=0; i < MAX_TRIES; i++) {
493 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
494 Dmsg0(900, "Failed connecting to SD.\n");
498 foreach_res(dev, R_DEVICE) {
499 if (!update_device_res(jcr, dev)) {
500 Dmsg1(900, "Error updating device=%s\n", dev->name());
502 Dmsg1(900, "Updated Device=%s\n", dev->name());
506 bnet_close(jcr->store_bsock);
507 jcr->store_bsock = NULL;
516 * Start a thread to handle getting Device resource information
517 * from SD. This is called once at startup of the Director.
519 void init_device_resources()
524 Dmsg0(100, "Start Device thread.\n");
525 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
527 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));