2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-20076 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation plus additions
11 that are listed in the file LICENSE.
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
30 * Bacula Director -- msgchan.c -- handles the message channel
31 * to the Storage daemon and the File daemon.
33 * Kern Sibbald, August MM
35 * This routine runs as a thread and must be thread reentrant.
37 * Basic tasks done here:
38 * Open a message channel with the Storage daemon
39 * to authenticate ourself and to pass the JobId.
40 * Create a thread to interact with the Storage daemon
41 * who returns a job status and requests Catalog services, etc.
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
51 /* Commands sent to Storage daemon */
52 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
53 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
83 if (jcr->store_bsock) {
84 return true; /* already connected */
87 /* If there is a write storage use it */
94 if (store->heartbeat_interval) {
95 heart_beat = store->heartbeat_interval;
97 heart_beat = director->heartbeat_interval;
101 * Open message channel with the Storage daemon
103 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
105 sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106 _("Storage daemon"), store->address,
107 NULL, store->SDport, verbose);
111 sd->res = (RES *)store; /* save pointer to other end */
112 jcr->store_bsock = sd;
114 if (!authenticate_storage_daemon(jcr, store)) {
116 jcr->store_bsock = NULL;
123 * Here we ask the SD to send us the info for a
124 * particular device resource.
127 bool update_device_res(JCR *jcr, DEVICE *dev)
129 POOL_MEM device_name;
131 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
134 sd = jcr->store_bsock;
135 pm_strcpy(device_name, dev->hdr.name);
136 bash_spaces(device_name);
137 bnet_fsend(sd, query_device, device_name.c_str());
138 Dmsg1(100, ">stored: %s\n", sd->msg);
139 /* The data is returned through Device_update */
140 if (bget_dirmsg(sd) <= 0) {
148 * Start a job with the Storage daemon
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
156 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157 POOL_MEM job_name, client_name, fileset_name;
162 sd = jcr->store_bsock;
164 * Now send JobId and permissions, and get back the authorization key.
166 pm_strcpy(job_name, jcr->job->hdr.name);
167 bash_spaces(job_name);
168 pm_strcpy(client_name, jcr->client->hdr.name);
169 bash_spaces(client_name);
170 pm_strcpy(fileset_name, jcr->fileset->hdr.name);
171 bash_spaces(fileset_name);
172 if (jcr->fileset->MD5[0] == 0) {
173 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
175 /* If rescheduling, cancel the previous incarnation of this job
176 * with the SD, which might be waiting on the FD connection.
177 * If we do not cancel it the SD will not accept a new connection
178 * for the same jobid.
180 if (jcr->reschedule_count) {
181 bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
182 while (bnet_recv(sd) >= 0)
185 bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
186 job_name.c_str(), client_name.c_str(),
187 jcr->JobType, jcr->JobLevel,
188 fileset_name.c_str(), !jcr->pool->catalog_files,
189 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
190 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
191 Dmsg1(100, ">stored: %s\n", sd->msg);
192 if (bget_dirmsg(sd) > 0) {
193 Dmsg1(100, "<stored: %s", sd->msg);
194 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
195 &jcr->VolSessionTime, &auth_key) != 3) {
196 Dmsg1(100, "BadJob=%s\n", sd->msg);
197 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
200 jcr->sd_auth_key = bstrdup(auth_key);
201 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
204 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
210 * We have two loops here. The first comes from the
211 * Storage = associated with the Job, and we need
212 * to attach to each one.
213 * The inner loop loops over all the alternative devices
214 * associated with each Storage. It selects the first
218 /* Do read side of storage daemon */
220 /* For the moment, only migrate has rpool */
221 if (jcr->JobType == JT_MIGRATE) {
222 pm_strcpy(pool_type, jcr->rpool->pool_type);
223 pm_strcpy(pool_name, jcr->rpool->name());
225 pm_strcpy(pool_type, jcr->pool->pool_type);
226 pm_strcpy(pool_name, jcr->pool->name());
228 bash_spaces(pool_type);
229 bash_spaces(pool_name);
230 foreach_alist(storage, rstore) {
231 Dmsg1(100, "Rstore=%s\n", storage->name());
232 bash_spaces(store_name);
233 pm_strcpy(media_type, storage->media_type);
234 bash_spaces(media_type);
235 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
236 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
237 Dmsg1(100, "rstore >stored: %s", sd->msg);
239 /* Loop over alternative storage Devices until one is OK */
240 foreach_alist(dev, storage->device) {
241 pm_strcpy(device_name, dev->hdr.name);
242 bash_spaces(device_name);
243 bnet_fsend(sd, use_device, device_name.c_str());
244 Dmsg1(100, ">stored: %s", sd->msg);
246 bnet_sig(sd, BNET_EOD); /* end of Devices */
248 bnet_sig(sd, BNET_EOD); /* end of Storages */
249 if (bget_dirmsg(sd) > 0) {
250 Dmsg1(100, "<stored: %s", sd->msg);
251 /* ****FIXME**** save actual device name */
252 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
258 /* Do write side of storage daemon */
260 pm_strcpy(pool_type, jcr->pool->pool_type);
261 pm_strcpy(pool_name, jcr->pool->name());
262 bash_spaces(pool_type);
263 bash_spaces(pool_name);
264 foreach_alist(storage, wstore) {
265 pm_strcpy(store_name, storage->name());
266 bash_spaces(store_name);
267 pm_strcpy(media_type, storage->media_type);
268 bash_spaces(media_type);
269 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
270 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
272 Dmsg1(100, "wstore >stored: %s", sd->msg);
274 /* Loop over alternative storage Devices until one is OK */
275 foreach_alist(dev, storage->device) {
276 pm_strcpy(device_name, dev->hdr.name);
277 bash_spaces(device_name);
278 bnet_fsend(sd, use_device, device_name.c_str());
279 Dmsg1(100, ">stored: %s", sd->msg);
281 bnet_sig(sd, BNET_EOD); /* end of Devices */
283 bnet_sig(sd, BNET_EOD); /* end of Storages */
284 if (bget_dirmsg(sd) > 0) {
285 Dmsg1(100, "<stored: %s", sd->msg);
286 /* ****FIXME**** save actual device name */
287 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
295 pm_strcpy(err_msg, sd->msg); /* save message */
296 Jmsg(jcr, M_FATAL, 0, _("\n"
297 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
298 device_name.c_str(), err_msg.c_str()/* sd->msg */);
300 Jmsg(jcr, M_FATAL, 0, _("\n"
301 " Storage daemon didn't accept Device \"%s\" command.\n"),
302 device_name.c_str());
305 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
311 * Start a thread to handle Storage daemon messages and
314 bool start_storage_daemon_message_thread(JCR *jcr)
319 jcr->inc_use_count(); /* mark in use by msg thread */
320 jcr->sd_msg_thread_done = false;
321 jcr->SD_msg_chan = 0;
322 Dmsg0(100, "Start SD msg_thread.\n");
323 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
325 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
327 /* Wait for thread to start */
328 while (jcr->SD_msg_chan == 0) {
330 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
334 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
338 extern "C" void msg_thread_cleanup(void *arg)
340 JCR *jcr = (JCR *)arg;
341 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
342 jcr->sd_msg_thread_done = true;
343 jcr->SD_msg_chan = 0;
344 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
345 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
346 free_jcr(jcr); /* release jcr */
350 * Handle the message channel (i.e. requests from the
352 * Note, we are running in a separate thread.
354 extern "C" void *msg_thread(void *arg)
356 JCR *jcr = (JCR *)arg;
359 char Job[MAX_NAME_LENGTH];
364 pthread_detach(pthread_self());
365 jcr->SD_msg_chan = pthread_self();
366 pthread_cleanup_push(msg_thread_cleanup, arg);
367 sd = jcr->store_bsock;
369 /* Read the Storage daemon's output.
371 Dmsg0(100, "Start msg_thread loop\n");
372 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
373 Dmsg1(400, "<stored: %s", sd->msg);
374 if (sscanf(sd->msg, Job_start, Job) == 1) {
377 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
379 jcr->SDJobStatus = JobStatus; /* termination status */
380 jcr->SDJobFiles = JobFiles;
381 jcr->SDJobBytes = JobBytes;
384 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
386 if (is_bnet_error(sd)) {
387 jcr->SDJobStatus = JS_ErrorTerminated;
389 pthread_cleanup_pop(1); /* remove and execute the handler */
393 void wait_for_storage_daemon_termination(JCR *jcr)
395 int cancel_count = 0;
396 /* Now wait for Storage daemon to terminate our message thread */
397 while (!jcr->sd_msg_thread_done) {
400 struct timespec timeout;
402 gettimeofday(&tv, &tz);
404 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
405 Dmsg0(400, "I'm waiting for message thread termination.\n");
407 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
409 if (job_canceled(jcr)) {
410 if (jcr->SD_msg_chan) {
411 jcr->store_bsock->timed_out = 1;
412 jcr->store_bsock->terminated = 1;
413 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
414 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
418 /* Give SD 30 seconds to clean up after cancel */
419 if (cancel_count == 6) {
423 set_jcr_job_status(jcr, JS_Terminated);
429 extern "C" void *device_thread(void *arg)
436 pthread_detach(pthread_self());
437 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
438 for (i=0; i < MAX_TRIES; i++) {
439 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
440 Dmsg0(900, "Failed connecting to SD.\n");
444 foreach_res(dev, R_DEVICE) {
445 if (!update_device_res(jcr, dev)) {
446 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
448 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
452 bnet_close(jcr->store_bsock);
453 jcr->store_bsock = NULL;
462 * Start a thread to handle getting Device resource information
463 * from SD. This is called once at startup of the Director.
465 void init_device_resources()
470 Dmsg0(100, "Start Device thread.\n");
471 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
473 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));