3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2006 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
40 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43 "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
47 /* Response from Storage daemon */
48 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[] = "3000 OK use device device=%s\n";
51 /* Storage Daemon requests */
52 static char Job_start[] = "3010 Job %127s start\n";
53 static char Job_end[] =
54 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
60 * Establish a message channel connection with the Storage daemon
61 * and perform authentication.
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64 int max_retry_time, int verbose)
69 if (jcr->store_bsock) {
70 return true; /* already connected */
73 /* If there is a write storage use it */
75 store = (STORE *)jcr->wstorage->first();
77 store = (STORE *)jcr->rstorage->first();
81 * Open message channel with the Storage daemon
83 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
85 sd = bnet_connect(jcr, retry_interval, max_retry_time,
86 _("Storage daemon"), store->address,
87 NULL, store->SDport, verbose);
91 sd->res = (RES *)store; /* save pointer to other end */
92 jcr->store_bsock = sd;
94 if (!authenticate_storage_daemon(jcr, store)) {
96 jcr->store_bsock = NULL;
103 * Here we ask the SD to send us the info for a
104 * particular device resource.
107 bool update_device_res(JCR *jcr, DEVICE *dev)
109 POOL_MEM device_name;
111 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
114 sd = jcr->store_bsock;
115 pm_strcpy(device_name, dev->hdr.name);
116 bash_spaces(device_name);
117 bnet_fsend(sd, query_device, device_name.c_str());
118 Dmsg1(100, ">stored: %s\n", sd->msg);
119 /* The data is returned through Device_update */
120 if (bget_dirmsg(sd) <= 0) {
128 * Start a job with the Storage daemon
130 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
136 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
137 POOL_MEM job_name, client_name, fileset_name;
142 sd = jcr->store_bsock;
144 * Now send JobId and permissions, and get back the authorization key.
146 pm_strcpy(job_name, jcr->job->hdr.name);
147 bash_spaces(job_name);
148 pm_strcpy(client_name, jcr->client->hdr.name);
149 bash_spaces(client_name);
150 pm_strcpy(fileset_name, jcr->fileset->hdr.name);
151 bash_spaces(fileset_name);
152 if (jcr->fileset->MD5[0] == 0) {
153 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
155 /* If rescheduling, cancel the previous incarnation of this job
156 * with the SD, which might be waiting on the FD connection.
157 * If we do not cancel it the SD will not accept a new connection
158 * for the same jobid.
160 if (jcr->reschedule_count) {
161 bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
162 while (bnet_recv(sd) >= 0)
165 bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
166 job_name.c_str(), client_name.c_str(),
167 jcr->JobType, jcr->JobLevel,
168 fileset_name.c_str(), !jcr->pool->catalog_files,
169 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
170 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
171 Dmsg1(100, ">stored: %s\n", sd->msg);
172 if (bget_dirmsg(sd) > 0) {
173 Dmsg1(100, "<stored: %s", sd->msg);
174 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
175 &jcr->VolSessionTime, &auth_key) != 3) {
176 Dmsg1(100, "BadJob=%s\n", sd->msg);
177 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
180 jcr->sd_auth_key = bstrdup(auth_key);
181 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
184 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
189 pm_strcpy(pool_type, jcr->pool->pool_type);
190 pm_strcpy(pool_name, jcr->pool->hdr.name);
191 bash_spaces(pool_type);
192 bash_spaces(pool_name);
195 * We have two loops here. The first comes from the
196 * Storage = associated with the Job, and we need
197 * to attach to each one.
198 * The inner loop loops over all the alternative devices
199 * associated with each Storage. It selects the first
203 /* Do read side of storage daemon */
205 foreach_alist(storage, rstore) {
206 pm_strcpy(store_name, storage->hdr.name);
207 bash_spaces(store_name);
208 pm_strcpy(media_type, storage->media_type);
209 bash_spaces(media_type);
210 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
211 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
214 /* Loop over alternative storage Devices until one is OK */
215 foreach_alist(dev, storage->device) {
216 pm_strcpy(device_name, dev->hdr.name);
217 bash_spaces(device_name);
218 bnet_fsend(sd, use_device, device_name.c_str());
219 Dmsg1(100, ">stored: %s", sd->msg);
221 bnet_sig(sd, BNET_EOD); /* end of Devices */
223 bnet_sig(sd, BNET_EOD); /* end of Storages */
224 if (bget_dirmsg(sd) > 0) {
225 Dmsg1(100, "<stored: %s", sd->msg);
226 /* ****FIXME**** save actual device name */
227 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
233 /* Do write side of storage daemon */
235 foreach_alist(storage, wstore) {
236 pm_strcpy(store_name, storage->hdr.name);
237 bash_spaces(store_name);
238 pm_strcpy(media_type, storage->media_type);
239 bash_spaces(media_type);
240 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
241 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
244 /* Loop over alternative storage Devices until one is OK */
245 foreach_alist(dev, storage->device) {
246 pm_strcpy(device_name, dev->hdr.name);
247 bash_spaces(device_name);
248 bnet_fsend(sd, use_device, device_name.c_str());
249 Dmsg1(100, ">stored: %s", sd->msg);
251 bnet_sig(sd, BNET_EOD); /* end of Devices */
253 bnet_sig(sd, BNET_EOD); /* end of Storages */
254 if (bget_dirmsg(sd) > 0) {
255 Dmsg1(100, "<stored: %s", sd->msg);
256 /* ****FIXME**** save actual device name */
257 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
265 pm_strcpy(err_msg, sd->msg); /* save message */
266 Jmsg(jcr, M_FATAL, 0, _("\n"
267 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
268 device_name.c_str(), err_msg.c_str()/* sd->msg */);
270 Jmsg(jcr, M_FATAL, 0, _("\n"
271 " Storage daemon didn't accept Device \"%s\" command.\n"),
272 device_name.c_str());
279 * Start a thread to handle Storage daemon messages and
282 int start_storage_daemon_message_thread(JCR *jcr)
287 jcr->inc_use_count(); /* mark in use by msg thread */
288 jcr->sd_msg_thread_done = false;
289 jcr->SD_msg_chan = 0;
290 Dmsg0(100, "Start SD msg_thread.\n");
291 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
293 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
295 /* Wait for thread to start */
296 while (jcr->SD_msg_chan == 0) {
299 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
303 extern "C" void msg_thread_cleanup(void *arg)
305 JCR *jcr = (JCR *)arg;
306 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
307 jcr->sd_msg_thread_done = true;
308 jcr->SD_msg_chan = 0;
309 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
310 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
311 free_jcr(jcr); /* release jcr */
315 * Handle the message channel (i.e. requests from the
317 * Note, we are running in a separate thread.
319 extern "C" void *msg_thread(void *arg)
321 JCR *jcr = (JCR *)arg;
324 char Job[MAX_NAME_LENGTH];
329 pthread_detach(pthread_self());
330 jcr->SD_msg_chan = pthread_self();
331 pthread_cleanup_push(msg_thread_cleanup, arg);
332 sd = jcr->store_bsock;
334 /* Read the Storage daemon's output.
336 Dmsg0(100, "Start msg_thread loop\n");
337 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
338 Dmsg1(400, "<stored: %s", sd->msg);
339 if (sscanf(sd->msg, Job_start, Job) == 1) {
342 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
344 jcr->SDJobStatus = JobStatus; /* termination status */
345 jcr->SDJobFiles = JobFiles;
346 jcr->SDJobBytes = JobBytes;
349 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
351 if (is_bnet_error(sd)) {
352 jcr->SDJobStatus = JS_ErrorTerminated;
354 pthread_cleanup_pop(1); /* remove and execute the handler */
358 void wait_for_storage_daemon_termination(JCR *jcr)
360 int cancel_count = 0;
361 /* Now wait for Storage daemon to terminate our message thread */
362 while (!jcr->sd_msg_thread_done) {
365 struct timespec timeout;
367 gettimeofday(&tv, &tz);
369 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
370 Dmsg0(400, "I'm waiting for message thread termination.\n");
372 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
374 if (job_canceled(jcr)) {
375 if (jcr->SD_msg_chan) {
376 jcr->store_bsock->timed_out = 1;
377 jcr->store_bsock->terminated = 1;
378 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
379 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
383 /* Give SD 30 seconds to clean up after cancel */
384 if (cancel_count == 6) {
388 set_jcr_job_status(jcr, JS_Terminated);
394 extern "C" void *device_thread(void *arg)
401 pthread_detach(pthread_self());
402 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
403 for (i=0; i < MAX_TRIES; i++) {
404 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
405 Dmsg0(900, "Failed connecting to SD.\n");
409 foreach_res(dev, R_DEVICE) {
410 if (!update_device_res(jcr, dev)) {
411 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
413 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
417 bnet_close(jcr->store_bsock);
418 jcr->store_bsock = NULL;
427 * Start a thread to handle getting Device resource information
428 * from SD. This is called once at startup of the Director.
430 void init_device_resources()
435 Dmsg0(100, "Start Device thread.\n");
436 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
438 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));