2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
30 * Bacula Director -- msgchan.c -- handles the message channel
31 * to the Storage daemon and the File daemon.
33 * Kern Sibbald, August MM
35 * This routine runs as a thread and must be thread reentrant.
37 * Basic tasks done here:
38 * Open a message channel with the Storage daemon
39 * to authenticate ourself and to pass the JobId.
40 * Create a thread to interact with the Storage daemon
41 * who returns a job status and requests Catalog services, etc.
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
51 /* Commands sent to Storage daemon */
52 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
53 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
83 if (jcr->store_bsock) {
84 return true; /* already connected */
87 /* If there is a write storage use it */
94 if (store->heartbeat_interval) {
95 heart_beat = store->heartbeat_interval;
97 heart_beat = director->heartbeat_interval;
101 * Open message channel with the Storage daemon
103 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
105 sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106 _("Storage daemon"), store->address,
107 NULL, store->SDport, verbose);
111 sd->res = (RES *)store; /* save pointer to other end */
112 jcr->store_bsock = sd;
114 if (!authenticate_storage_daemon(jcr, store)) {
116 jcr->store_bsock = NULL;
123 * Here we ask the SD to send us the info for a
124 * particular device resource.
127 bool update_device_res(JCR *jcr, DEVICE *dev)
129 POOL_MEM device_name;
131 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
134 sd = jcr->store_bsock;
135 pm_strcpy(device_name, dev->name());
136 bash_spaces(device_name);
137 sd->fsend(query_device, device_name.c_str());
138 Dmsg1(100, ">stored: %s\n", sd->msg);
139 /* The data is returned through Device_update */
140 if (bget_dirmsg(sd) <= 0) {
147 static char OKbootstrap[] = "3000 OK bootstrap\n";
150 * Start a job with the Storage daemon
152 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
158 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
159 POOL_MEM job_name, client_name, fileset_name;
162 char ed1[30], ed2[30];
164 sd = jcr->store_bsock;
166 * Now send JobId and permissions, and get back the authorization key.
168 pm_strcpy(job_name, jcr->job->name());
169 bash_spaces(job_name);
170 pm_strcpy(client_name, jcr->client->name());
171 bash_spaces(client_name);
172 pm_strcpy(fileset_name, jcr->fileset->name());
173 bash_spaces(fileset_name);
174 if (jcr->fileset->MD5[0] == 0) {
175 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
177 /* If rescheduling, cancel the previous incarnation of this job
178 * with the SD, which might be waiting on the FD connection.
179 * If we do not cancel it the SD will not accept a new connection
180 * for the same jobid.
182 if (jcr->reschedule_count) {
183 sd->fsend("cancel Job=%s\n", jcr->Job);
184 while (sd->recv() >= 0)
187 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
188 job_name.c_str(), client_name.c_str(),
189 jcr->get_JobType(), jcr->get_JobLevel(),
190 fileset_name.c_str(), !jcr->pool->catalog_files,
191 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
192 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
193 edit_int64(jcr->spool_size, ed2));
194 Dmsg1(100, ">stored: %s", sd->msg);
195 if (bget_dirmsg(sd) > 0) {
196 Dmsg1(100, "<stored: %s", sd->msg);
197 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
198 &jcr->VolSessionTime, &auth_key) != 3) {
199 Dmsg1(100, "BadJob=%s\n", sd->msg);
200 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
203 jcr->sd_auth_key = bstrdup(auth_key);
204 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
207 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
212 if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
213 !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
218 * We have two loops here. The first comes from the
219 * Storage = associated with the Job, and we need
220 * to attach to each one.
221 * The inner loop loops over all the alternative devices
222 * associated with each Storage. It selects the first
226 /* Do read side of storage daemon */
228 /* For the moment, only migrate, copy and vbackup have rpool */
229 if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
230 (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
231 pm_strcpy(pool_type, jcr->rpool->pool_type);
232 pm_strcpy(pool_name, jcr->rpool->name());
234 pm_strcpy(pool_type, jcr->pool->pool_type);
235 pm_strcpy(pool_name, jcr->pool->name());
237 bash_spaces(pool_type);
238 bash_spaces(pool_name);
239 foreach_alist(storage, rstore) {
240 Dmsg1(100, "Rstore=%s\n", storage->name());
241 pm_strcpy(store_name, storage->name());
242 bash_spaces(store_name);
243 pm_strcpy(media_type, storage->media_type);
244 bash_spaces(media_type);
245 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
246 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
247 Dmsg1(100, "rstore >stored: %s", sd->msg);
249 /* Loop over alternative storage Devices until one is OK */
250 foreach_alist(dev, storage->device) {
251 pm_strcpy(device_name, dev->name());
252 bash_spaces(device_name);
253 sd->fsend(use_device, device_name.c_str());
254 Dmsg1(100, ">stored: %s", sd->msg);
256 sd->signal(BNET_EOD); /* end of Devices */
258 sd->signal(BNET_EOD); /* end of Storages */
259 if (bget_dirmsg(sd) > 0) {
260 Dmsg1(100, "<stored: %s", sd->msg);
261 /* ****FIXME**** save actual device name */
262 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
268 /* Do write side of storage daemon */
270 pm_strcpy(pool_type, jcr->pool->pool_type);
271 pm_strcpy(pool_name, jcr->pool->name());
272 bash_spaces(pool_type);
273 bash_spaces(pool_name);
274 foreach_alist(storage, wstore) {
275 pm_strcpy(store_name, storage->name());
276 bash_spaces(store_name);
277 pm_strcpy(media_type, storage->media_type);
278 bash_spaces(media_type);
279 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
280 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
282 Dmsg1(100, "wstore >stored: %s", sd->msg);
284 /* Loop over alternative storage Devices until one is OK */
285 foreach_alist(dev, storage->device) {
286 pm_strcpy(device_name, dev->name());
287 bash_spaces(device_name);
288 sd->fsend(use_device, device_name.c_str());
289 Dmsg1(100, ">stored: %s", sd->msg);
291 sd->signal(BNET_EOD); /* end of Devices */
293 sd->signal(BNET_EOD); /* end of Storages */
294 if (bget_dirmsg(sd) > 0) {
295 Dmsg1(100, "<stored: %s", sd->msg);
296 /* ****FIXME**** save actual device name */
297 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
305 pm_strcpy(err_msg, sd->msg); /* save message */
306 Jmsg(jcr, M_FATAL, 0, _("\n"
307 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
308 device_name.c_str(), err_msg.c_str()/* sd->msg */);
310 Jmsg(jcr, M_FATAL, 0, _("\n"
311 " Storage daemon didn't accept Device \"%s\" command.\n"),
312 device_name.c_str());
315 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
321 * Start a thread to handle Storage daemon messages and
324 bool start_storage_daemon_message_thread(JCR *jcr)
329 jcr->inc_use_count(); /* mark in use by msg thread */
330 jcr->sd_msg_thread_done = false;
331 jcr->SD_msg_chan = 0;
332 Dmsg0(100, "Start SD msg_thread.\n");
333 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
335 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
337 /* Wait for thread to start */
338 while (jcr->SD_msg_chan == 0) {
340 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
344 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
348 extern "C" void msg_thread_cleanup(void *arg)
350 JCR *jcr = (JCR *)arg;
351 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
352 jcr->sd_msg_thread_done = true;
353 jcr->SD_msg_chan = 0;
354 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
355 Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
356 free_jcr(jcr); /* release jcr */
357 db_thread_cleanup(); /* remove thread specific data */
361 * Handle the message channel (i.e. requests from the
363 * Note, we are running in a separate thread.
365 extern "C" void *msg_thread(void *arg)
367 JCR *jcr = (JCR *)arg;
370 char Job[MAX_NAME_LENGTH];
371 uint32_t JobFiles, JobErrors;
374 pthread_detach(pthread_self());
376 jcr->SD_msg_chan = pthread_self();
377 pthread_cleanup_push(msg_thread_cleanup, arg);
378 sd = jcr->store_bsock;
380 /* Read the Storage daemon's output.
382 Dmsg0(100, "Start msg_thread loop\n");
383 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
384 Dmsg1(400, "<stored: %s", sd->msg);
385 if (sscanf(sd->msg, Job_start, Job) == 1) {
388 if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
389 &JobBytes, &JobErrors) == 5) {
390 jcr->SDJobStatus = JobStatus; /* termination status */
391 jcr->SDJobFiles = JobFiles;
392 jcr->SDJobBytes = JobBytes;
393 jcr->SDErrors = JobErrors;
396 Dmsg1(400, "end loop use=%d\n", jcr->use_count());
398 if (is_bnet_error(sd)) {
399 jcr->SDJobStatus = JS_ErrorTerminated;
401 pthread_cleanup_pop(1); /* remove and execute the handler */
405 void wait_for_storage_daemon_termination(JCR *jcr)
407 int cancel_count = 0;
408 /* Now wait for Storage daemon to terminate our message thread */
409 while (!jcr->sd_msg_thread_done) {
412 struct timespec timeout;
414 gettimeofday(&tv, &tz);
416 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
417 Dmsg0(400, "I'm waiting for message thread termination.\n");
419 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
421 if (job_canceled(jcr)) {
422 if (jcr->SD_msg_chan) {
423 jcr->store_bsock->set_timed_out();
424 jcr->store_bsock->set_terminated();
425 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
426 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
430 /* Give SD 30 seconds to clean up after cancel */
431 if (cancel_count == 6) {
435 set_jcr_job_status(jcr, JS_Terminated);
441 extern "C" void *device_thread(void *arg)
448 pthread_detach(pthread_self());
449 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
450 for (i=0; i < MAX_TRIES; i++) {
451 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
452 Dmsg0(900, "Failed connecting to SD.\n");
456 foreach_res(dev, R_DEVICE) {
457 if (!update_device_res(jcr, dev)) {
458 Dmsg1(900, "Error updating device=%s\n", dev->name());
460 Dmsg1(900, "Updated Device=%s\n", dev->name());
464 bnet_close(jcr->store_bsock);
465 jcr->store_bsock = NULL;
474 * Start a thread to handle getting Device resource information
475 * from SD. This is called once at startup of the Director.
477 void init_device_resources()
482 Dmsg0(100, "Start Device thread.\n");
483 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
485 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));