2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
30 * Bacula Director -- msgchan.c -- handles the message channel
31 * to the Storage daemon and the File daemon.
33 * Kern Sibbald, August MM
35 * This routine runs as a thread and must be thread reentrant.
37 * Basic tasks done here:
38 * Open a message channel with the Storage daemon
39 * to authenticate ourself and to pass the JobId.
40 * Create a thread to interact with the Storage daemon
41 * who returns a job status and requests Catalog services, etc.
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
51 /* Commands sent to Storage daemon */
52 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
53 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
83 if (jcr->store_bsock) {
84 return true; /* already connected */
87 /* If there is a write storage use it */
94 if (store->heartbeat_interval) {
95 heart_beat = store->heartbeat_interval;
97 heart_beat = director->heartbeat_interval;
101 * Open message channel with the Storage daemon
103 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
105 sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106 _("Storage daemon"), store->address,
107 NULL, store->SDport, verbose);
111 sd->res = (RES *)store; /* save pointer to other end */
112 jcr->store_bsock = sd;
114 if (!authenticate_storage_daemon(jcr, store)) {
116 jcr->store_bsock = NULL;
123 * Here we ask the SD to send us the info for a
124 * particular device resource.
127 bool update_device_res(JCR *jcr, DEVICE *dev)
129 POOL_MEM device_name;
131 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
134 sd = jcr->store_bsock;
135 pm_strcpy(device_name, dev->name());
136 bash_spaces(device_name);
137 sd->fsend(query_device, device_name.c_str());
138 Dmsg1(100, ">stored: %s\n", sd->msg);
139 /* The data is returned through Device_update */
140 if (bget_dirmsg(sd) <= 0) {
148 * Start a job with the Storage daemon
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
156 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157 POOL_MEM job_name, client_name, fileset_name;
160 char ed1[30], ed2[30];
162 sd = jcr->store_bsock;
164 * Now send JobId and permissions, and get back the authorization key.
166 pm_strcpy(job_name, jcr->job->name());
167 bash_spaces(job_name);
168 pm_strcpy(client_name, jcr->client->name());
169 bash_spaces(client_name);
170 pm_strcpy(fileset_name, jcr->fileset->name());
171 bash_spaces(fileset_name);
172 if (jcr->fileset->MD5[0] == 0) {
173 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
175 /* If rescheduling, cancel the previous incarnation of this job
176 * with the SD, which might be waiting on the FD connection.
177 * If we do not cancel it the SD will not accept a new connection
178 * for the same jobid.
180 if (jcr->reschedule_count) {
181 sd->fsend("cancel Job=%s\n", jcr->Job);
182 while (sd->recv() >= 0)
185 sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
186 job_name.c_str(), client_name.c_str(),
187 jcr->get_JobType(), jcr->get_JobLevel(),
188 fileset_name.c_str(), !jcr->pool->catalog_files,
189 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
190 jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
191 edit_int64(jcr->spool_size, ed2));
192 Dmsg1(100, ">stored: %s\n", sd->msg);
193 if (bget_dirmsg(sd) > 0) {
194 Dmsg1(100, "<stored: %s", sd->msg);
195 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
196 &jcr->VolSessionTime, &auth_key) != 3) {
197 Dmsg1(100, "BadJob=%s\n", sd->msg);
198 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
201 jcr->sd_auth_key = bstrdup(auth_key);
202 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
205 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
211 * We have two loops here. The first comes from the
212 * Storage = associated with the Job, and we need
213 * to attach to each one.
214 * The inner loop loops over all the alternative devices
215 * associated with each Storage. It selects the first
219 /* Do read side of storage daemon */
221 /* For the moment, only migrate, copy and vbackup have rpool */
222 if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
223 (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
224 pm_strcpy(pool_type, jcr->rpool->pool_type);
225 pm_strcpy(pool_name, jcr->rpool->name());
227 pm_strcpy(pool_type, jcr->pool->pool_type);
228 pm_strcpy(pool_name, jcr->pool->name());
230 bash_spaces(pool_type);
231 bash_spaces(pool_name);
232 foreach_alist(storage, rstore) {
233 Dmsg1(100, "Rstore=%s\n", storage->name());
234 pm_strcpy(store_name, storage->name());
235 bash_spaces(store_name);
236 pm_strcpy(media_type, storage->media_type);
237 bash_spaces(media_type);
238 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
239 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
240 Dmsg1(100, "rstore >stored: %s", sd->msg);
242 /* Loop over alternative storage Devices until one is OK */
243 foreach_alist(dev, storage->device) {
244 pm_strcpy(device_name, dev->name());
245 bash_spaces(device_name);
246 sd->fsend(use_device, device_name.c_str());
247 Dmsg1(100, ">stored: %s", sd->msg);
249 sd->signal(BNET_EOD); /* end of Devices */
251 sd->signal(BNET_EOD); /* end of Storages */
252 if (bget_dirmsg(sd) > 0) {
253 Dmsg1(100, "<stored: %s", sd->msg);
254 /* ****FIXME**** save actual device name */
255 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
261 /* Do write side of storage daemon */
263 pm_strcpy(pool_type, jcr->pool->pool_type);
264 pm_strcpy(pool_name, jcr->pool->name());
265 bash_spaces(pool_type);
266 bash_spaces(pool_name);
267 foreach_alist(storage, wstore) {
268 pm_strcpy(store_name, storage->name());
269 bash_spaces(store_name);
270 pm_strcpy(media_type, storage->media_type);
271 bash_spaces(media_type);
272 sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
273 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
275 Dmsg1(100, "wstore >stored: %s", sd->msg);
277 /* Loop over alternative storage Devices until one is OK */
278 foreach_alist(dev, storage->device) {
279 pm_strcpy(device_name, dev->name());
280 bash_spaces(device_name);
281 sd->fsend(use_device, device_name.c_str());
282 Dmsg1(100, ">stored: %s", sd->msg);
284 sd->signal(BNET_EOD); /* end of Devices */
286 sd->signal(BNET_EOD); /* end of Storages */
287 if (bget_dirmsg(sd) > 0) {
288 Dmsg1(100, "<stored: %s", sd->msg);
289 /* ****FIXME**** save actual device name */
290 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
298 pm_strcpy(err_msg, sd->msg); /* save message */
299 Jmsg(jcr, M_FATAL, 0, _("\n"
300 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
301 device_name.c_str(), err_msg.c_str()/* sd->msg */);
303 Jmsg(jcr, M_FATAL, 0, _("\n"
304 " Storage daemon didn't accept Device \"%s\" command.\n"),
305 device_name.c_str());
308 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
314 * Start a thread to handle Storage daemon messages and
317 bool start_storage_daemon_message_thread(JCR *jcr)
322 jcr->inc_use_count(); /* mark in use by msg thread */
323 jcr->sd_msg_thread_done = false;
324 jcr->SD_msg_chan = 0;
325 Dmsg0(100, "Start SD msg_thread.\n");
326 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
328 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
330 /* Wait for thread to start */
331 while (jcr->SD_msg_chan == 0) {
333 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
337 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
341 extern "C" void msg_thread_cleanup(void *arg)
343 JCR *jcr = (JCR *)arg;
344 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
345 jcr->sd_msg_thread_done = true;
346 jcr->SD_msg_chan = 0;
347 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
348 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
349 free_jcr(jcr); /* release jcr */
350 db_thread_cleanup(); /* remove thread specific data */
354 * Handle the message channel (i.e. requests from the
356 * Note, we are running in a separate thread.
358 extern "C" void *msg_thread(void *arg)
360 JCR *jcr = (JCR *)arg;
363 char Job[MAX_NAME_LENGTH];
368 pthread_detach(pthread_self());
370 jcr->SD_msg_chan = pthread_self();
371 pthread_cleanup_push(msg_thread_cleanup, arg);
372 sd = jcr->store_bsock;
374 /* Read the Storage daemon's output.
376 Dmsg0(100, "Start msg_thread loop\n");
377 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
378 Dmsg1(400, "<stored: %s", sd->msg);
379 if (sscanf(sd->msg, Job_start, Job) == 1) {
382 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
384 jcr->SDJobStatus = JobStatus; /* termination status */
385 jcr->SDJobFiles = JobFiles;
386 jcr->SDJobBytes = JobBytes;
389 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
391 if (is_bnet_error(sd)) {
392 jcr->SDJobStatus = JS_ErrorTerminated;
394 pthread_cleanup_pop(1); /* remove and execute the handler */
398 void wait_for_storage_daemon_termination(JCR *jcr)
400 int cancel_count = 0;
401 /* Now wait for Storage daemon to terminate our message thread */
402 while (!jcr->sd_msg_thread_done) {
405 struct timespec timeout;
407 gettimeofday(&tv, &tz);
409 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
410 Dmsg0(400, "I'm waiting for message thread termination.\n");
412 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
414 if (job_canceled(jcr)) {
415 if (jcr->SD_msg_chan) {
416 jcr->store_bsock->set_timed_out();
417 jcr->store_bsock->set_terminated();
418 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
419 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
423 /* Give SD 30 seconds to clean up after cancel */
424 if (cancel_count == 6) {
428 set_jcr_job_status(jcr, JS_Terminated);
434 extern "C" void *device_thread(void *arg)
441 pthread_detach(pthread_self());
442 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
443 for (i=0; i < MAX_TRIES; i++) {
444 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
445 Dmsg0(900, "Failed connecting to SD.\n");
449 foreach_res(dev, R_DEVICE) {
450 if (!update_device_res(jcr, dev)) {
451 Dmsg1(900, "Error updating device=%s\n", dev->name());
453 Dmsg1(900, "Updated Device=%s\n", dev->name());
457 bnet_close(jcr->store_bsock);
458 jcr->store_bsock = NULL;
467 * Start a thread to handle getting Device resource information
468 * from SD. This is called once at startup of the Director.
470 void init_device_resources()
475 Dmsg0(100, "Start Device thread.\n");
476 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
478 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));