3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Bacula® - The Network Backup Solution
21 Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
23 The main author of Bacula is Kern Sibbald, with contributions from
24 many others, a complete list can be found in the file AUTHORS.
25 This program is Free Software; you can redistribute it and/or
26 modify it under the terms of version two of the GNU General Public
27 License as published by the Free Software Foundation plus additions
28 that are listed in the file LICENSE.
30 This program is distributed in the hope that it will be useful, but
31 WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33 General Public License for more details.
35 You should have received a copy of the GNU General Public License
36 along with this program; if not, write to the Free Software
37 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
40 Bacula® is a registered trademark of John Walker.
41 The licensor of Bacula is the Free Software Foundation Europe
42 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
43 Switzerland, email:ftf@fsfeurope.org.
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
51 /* Commands sent to Storage daemon */
52 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
53 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56 "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
60 /* Response from Storage daemon */
61 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[] = "3000 OK use device device=%s\n";
64 /* Storage Daemon requests */
65 static char Job_start[] = "3010 Job %127s start\n";
66 static char Job_end[] =
67 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
73 * Establish a message channel connection with the Storage daemon
74 * and perform authentication.
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77 int max_retry_time, int verbose)
82 if (jcr->store_bsock) {
83 return true; /* already connected */
86 /* If there is a write storage use it */
94 * Open message channel with the Storage daemon
96 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
98 sd = bnet_connect(jcr, retry_interval, max_retry_time,
99 _("Storage daemon"), store->address,
100 NULL, store->SDport, verbose);
104 sd->res = (RES *)store; /* save pointer to other end */
105 jcr->store_bsock = sd;
107 if (!authenticate_storage_daemon(jcr, store)) {
109 jcr->store_bsock = NULL;
116 * Here we ask the SD to send us the info for a
117 * particular device resource.
120 bool update_device_res(JCR *jcr, DEVICE *dev)
122 POOL_MEM device_name;
124 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
127 sd = jcr->store_bsock;
128 pm_strcpy(device_name, dev->hdr.name);
129 bash_spaces(device_name);
130 bnet_fsend(sd, query_device, device_name.c_str());
131 Dmsg1(100, ">stored: %s\n", sd->msg);
132 /* The data is returned through Device_update */
133 if (bget_dirmsg(sd) <= 0) {
141 * Start a job with the Storage daemon
143 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
149 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
150 POOL_MEM job_name, client_name, fileset_name;
155 sd = jcr->store_bsock;
157 * Now send JobId and permissions, and get back the authorization key.
159 pm_strcpy(job_name, jcr->job->hdr.name);
160 bash_spaces(job_name);
161 pm_strcpy(client_name, jcr->client->hdr.name);
162 bash_spaces(client_name);
163 pm_strcpy(fileset_name, jcr->fileset->hdr.name);
164 bash_spaces(fileset_name);
165 if (jcr->fileset->MD5[0] == 0) {
166 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
168 /* If rescheduling, cancel the previous incarnation of this job
169 * with the SD, which might be waiting on the FD connection.
170 * If we do not cancel it the SD will not accept a new connection
171 * for the same jobid.
173 if (jcr->reschedule_count) {
174 bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
175 while (bnet_recv(sd) >= 0)
178 bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
179 job_name.c_str(), client_name.c_str(),
180 jcr->JobType, jcr->JobLevel,
181 fileset_name.c_str(), !jcr->pool->catalog_files,
182 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
183 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
184 Dmsg1(100, ">stored: %s\n", sd->msg);
185 if (bget_dirmsg(sd) > 0) {
186 Dmsg1(100, "<stored: %s", sd->msg);
187 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
188 &jcr->VolSessionTime, &auth_key) != 3) {
189 Dmsg1(100, "BadJob=%s\n", sd->msg);
190 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
193 jcr->sd_auth_key = bstrdup(auth_key);
194 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
197 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
203 * We have two loops here. The first comes from the
204 * Storage = associated with the Job, and we need
205 * to attach to each one.
206 * The inner loop loops over all the alternative devices
207 * associated with each Storage. It selects the first
211 /* Do read side of storage daemon */
213 /* For the moment, only migrate has rpool */
214 if (jcr->JobType == JT_MIGRATE) {
215 pm_strcpy(pool_type, jcr->rpool->pool_type);
216 pm_strcpy(pool_name, jcr->rpool->name());
218 pm_strcpy(pool_type, jcr->pool->pool_type);
219 pm_strcpy(pool_name, jcr->pool->name());
221 bash_spaces(pool_type);
222 bash_spaces(pool_name);
223 foreach_alist(storage, rstore) {
224 Dmsg1(100, "Rstore=%s\n", storage->name());
225 bash_spaces(store_name);
226 pm_strcpy(media_type, storage->media_type);
227 bash_spaces(media_type);
228 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
229 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
230 Dmsg1(100, "rstore >stored: %s", sd->msg);
232 /* Loop over alternative storage Devices until one is OK */
233 foreach_alist(dev, storage->device) {
234 pm_strcpy(device_name, dev->hdr.name);
235 bash_spaces(device_name);
236 bnet_fsend(sd, use_device, device_name.c_str());
237 Dmsg1(100, ">stored: %s", sd->msg);
239 bnet_sig(sd, BNET_EOD); /* end of Devices */
241 bnet_sig(sd, BNET_EOD); /* end of Storages */
242 if (bget_dirmsg(sd) > 0) {
243 Dmsg1(100, "<stored: %s", sd->msg);
244 /* ****FIXME**** save actual device name */
245 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
251 /* Do write side of storage daemon */
253 pm_strcpy(pool_type, jcr->pool->pool_type);
254 pm_strcpy(pool_name, jcr->pool->name());
255 bash_spaces(pool_type);
256 bash_spaces(pool_name);
257 foreach_alist(storage, wstore) {
258 pm_strcpy(store_name, storage->name());
259 bash_spaces(store_name);
260 pm_strcpy(media_type, storage->media_type);
261 bash_spaces(media_type);
262 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
263 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
265 Dmsg1(100, "wstore >stored: %s", sd->msg);
267 /* Loop over alternative storage Devices until one is OK */
268 foreach_alist(dev, storage->device) {
269 pm_strcpy(device_name, dev->hdr.name);
270 bash_spaces(device_name);
271 bnet_fsend(sd, use_device, device_name.c_str());
272 Dmsg1(100, ">stored: %s", sd->msg);
274 bnet_sig(sd, BNET_EOD); /* end of Devices */
276 bnet_sig(sd, BNET_EOD); /* end of Storages */
277 if (bget_dirmsg(sd) > 0) {
278 Dmsg1(100, "<stored: %s", sd->msg);
279 /* ****FIXME**** save actual device name */
280 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
288 pm_strcpy(err_msg, sd->msg); /* save message */
289 Jmsg(jcr, M_FATAL, 0, _("\n"
290 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
291 device_name.c_str(), err_msg.c_str()/* sd->msg */);
293 Jmsg(jcr, M_FATAL, 0, _("\n"
294 " Storage daemon didn't accept Device \"%s\" command.\n"),
295 device_name.c_str());
298 Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
304 * Start a thread to handle Storage daemon messages and
307 bool start_storage_daemon_message_thread(JCR *jcr)
312 jcr->inc_use_count(); /* mark in use by msg thread */
313 jcr->sd_msg_thread_done = false;
314 jcr->SD_msg_chan = 0;
315 Dmsg0(100, "Start SD msg_thread.\n");
316 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
318 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
320 /* Wait for thread to start */
321 while (jcr->SD_msg_chan == 0) {
323 if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
327 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
331 extern "C" void msg_thread_cleanup(void *arg)
333 JCR *jcr = (JCR *)arg;
334 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
335 jcr->sd_msg_thread_done = true;
336 jcr->SD_msg_chan = 0;
337 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
338 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
339 free_jcr(jcr); /* release jcr */
343 * Handle the message channel (i.e. requests from the
345 * Note, we are running in a separate thread.
347 extern "C" void *msg_thread(void *arg)
349 JCR *jcr = (JCR *)arg;
352 char Job[MAX_NAME_LENGTH];
357 pthread_detach(pthread_self());
358 jcr->SD_msg_chan = pthread_self();
359 pthread_cleanup_push(msg_thread_cleanup, arg);
360 sd = jcr->store_bsock;
362 /* Read the Storage daemon's output.
364 Dmsg0(100, "Start msg_thread loop\n");
365 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
366 Dmsg1(400, "<stored: %s", sd->msg);
367 if (sscanf(sd->msg, Job_start, Job) == 1) {
370 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
372 jcr->SDJobStatus = JobStatus; /* termination status */
373 jcr->SDJobFiles = JobFiles;
374 jcr->SDJobBytes = JobBytes;
377 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
379 if (is_bnet_error(sd)) {
380 jcr->SDJobStatus = JS_ErrorTerminated;
382 pthread_cleanup_pop(1); /* remove and execute the handler */
386 void wait_for_storage_daemon_termination(JCR *jcr)
388 int cancel_count = 0;
389 /* Now wait for Storage daemon to terminate our message thread */
390 while (!jcr->sd_msg_thread_done) {
393 struct timespec timeout;
395 gettimeofday(&tv, &tz);
397 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
398 Dmsg0(400, "I'm waiting for message thread termination.\n");
400 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
402 if (job_canceled(jcr)) {
403 if (jcr->SD_msg_chan) {
404 jcr->store_bsock->timed_out = 1;
405 jcr->store_bsock->terminated = 1;
406 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
407 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
411 /* Give SD 30 seconds to clean up after cancel */
412 if (cancel_count == 6) {
416 set_jcr_job_status(jcr, JS_Terminated);
422 extern "C" void *device_thread(void *arg)
429 pthread_detach(pthread_self());
430 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
431 for (i=0; i < MAX_TRIES; i++) {
432 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
433 Dmsg0(900, "Failed connecting to SD.\n");
437 foreach_res(dev, R_DEVICE) {
438 if (!update_device_res(jcr, dev)) {
439 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
441 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
445 bnet_close(jcr->store_bsock);
446 jcr->store_bsock = NULL;
455 * Start a thread to handle getting Device resource information
456 * from SD. This is called once at startup of the Director.
458 void init_device_resources()
463 Dmsg0(100, "Start Device thread.\n");
464 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
466 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));