3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2006 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s "
40 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43 "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
47 /* Response from Storage daemon */
48 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[] = "3000 OK use device device=%s\n";
51 /* Storage Daemon requests */
52 static char Job_start[] = "3010 Job %127s start\n";
53 static char Job_end[] =
54 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
60 * Establish a message channel connection with the Storage daemon
61 * and perform authentication.
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64 int max_retry_time, int verbose)
69 if (jcr->store_bsock) {
70 return true; /* already connected */
72 store = (STORE *)jcr->storage->first();
75 * Open message channel with the Storage daemon
77 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
79 sd = bnet_connect(jcr, retry_interval, max_retry_time,
80 _("Storage daemon"), store->address,
81 NULL, store->SDport, verbose);
85 sd->res = (RES *)store; /* save pointer to other end */
86 jcr->store_bsock = sd;
88 if (!authenticate_storage_daemon(jcr, store)) {
90 jcr->store_bsock = NULL;
97 * Here we ask the SD to send us the info for a
98 * particular device resource.
101 bool update_device_res(JCR *jcr, DEVICE *dev)
103 POOL_MEM device_name;
105 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
108 sd = jcr->store_bsock;
109 pm_strcpy(device_name, dev->hdr.name);
110 bash_spaces(device_name);
111 bnet_fsend(sd, query_device, device_name.c_str());
112 Dmsg1(100, ">stored: %s\n", sd->msg);
113 /* The data is returned through Device_update */
114 if (bget_dirmsg(sd) <= 0) {
122 * Start a job with the Storage daemon
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
130 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
134 sd = jcr->store_bsock;
136 * Now send JobId and permissions, and get back the authorization key.
138 bash_spaces(jcr->job->hdr.name);
139 bash_spaces(jcr->client->hdr.name);
140 bash_spaces(jcr->fileset->hdr.name);
141 if (jcr->fileset->MD5[0] == 0) {
142 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
144 /* If rescheduling, cancel the previous incarnation of this job
145 * with the SD, which might be waiting on the FD connection.
146 * If we do not cancel it the SD will not accept a new connection
147 * for the same jobid.
149 if (jcr->reschedule_count) {
150 bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
151 while (bnet_recv(sd) >= 0)
154 bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
155 jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
156 jcr->fileset->hdr.name, !jcr->pool->catalog_files,
157 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
158 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
159 Dmsg1(100, ">stored: %s\n", sd->msg);
160 unbash_spaces(jcr->job->hdr.name);
161 unbash_spaces(jcr->client->hdr.name);
162 unbash_spaces(jcr->fileset->hdr.name);
163 if (bget_dirmsg(sd) > 0) {
164 Dmsg1(100, "<stored: %s", sd->msg);
165 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
166 &jcr->VolSessionTime, &auth_key) != 3) {
167 Dmsg1(100, "BadJob=%s\n", sd->msg);
168 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
171 jcr->sd_auth_key = bstrdup(auth_key);
172 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
175 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
180 pm_strcpy(pool_type, jcr->pool->pool_type);
181 pm_strcpy(pool_name, jcr->pool->hdr.name);
182 bash_spaces(pool_type);
183 bash_spaces(pool_name);
186 * We have two loops here. The first comes from the
187 * Storage = associated with the Job, and we need
188 * to attach to each one.
189 * The inner loop loops over all the alternative devices
190 * associated with each Storage. It selects the first
194 /* Do read side of storage daemon */
196 foreach_alist(storage, rstore) {
197 pm_strcpy(store_name, storage->hdr.name);
198 bash_spaces(store_name);
199 pm_strcpy(media_type, storage->media_type);
200 bash_spaces(media_type);
201 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
202 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
205 /* Loop over alternative storage Devices until one is OK */
206 foreach_alist(dev, storage->device) {
207 pm_strcpy(device_name, dev->hdr.name);
208 bash_spaces(device_name);
209 bnet_fsend(sd, use_device, device_name.c_str());
210 Dmsg1(100, ">stored: %s", sd->msg);
212 bnet_sig(sd, BNET_EOD); /* end of Devices */
214 bnet_sig(sd, BNET_EOD); /* end of Storages */
215 if (bget_dirmsg(sd) > 0) {
216 Dmsg1(100, "<stored: %s", sd->msg);
217 /* ****FIXME**** save actual device name */
218 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
221 pm_strcpy(err_msg, sd->msg); /* save message */
222 Jmsg(jcr, M_FATAL, 0, _("\n"
223 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
224 device_name.c_str(), err_msg.c_str()/* sd->msg */);
229 /* Do write side of storage daemon */
231 foreach_alist(storage, wstore) {
232 pm_strcpy(store_name, storage->hdr.name);
233 bash_spaces(store_name);
234 pm_strcpy(media_type, storage->media_type);
235 bash_spaces(media_type);
236 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
237 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
240 /* Loop over alternative storage Devices until one is OK */
241 foreach_alist(dev, storage->device) {
242 pm_strcpy(device_name, dev->hdr.name);
243 bash_spaces(device_name);
244 bnet_fsend(sd, use_device, device_name.c_str());
245 Dmsg1(100, ">stored: %s", sd->msg);
247 bnet_sig(sd, BNET_EOD); /* end of Devices */
249 bnet_sig(sd, BNET_EOD); /* end of Storages */
250 if (bget_dirmsg(sd) > 0) {
251 Dmsg1(100, "<stored: %s", sd->msg);
252 /* ****FIXME**** save actual device name */
253 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
256 pm_strcpy(err_msg, sd->msg); /* save message */
257 Jmsg(jcr, M_FATAL, 0, _("\n"
258 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
259 device_name.c_str(), err_msg.c_str()/* sd->msg */);
267 * Start a thread to handle Storage daemon messages and
270 int start_storage_daemon_message_thread(JCR *jcr)
275 jcr->inc_use_count(); /* mark in use by msg thread */
276 jcr->sd_msg_thread_done = false;
277 jcr->SD_msg_chan = 0;
278 Dmsg0(100, "Start SD msg_thread.\n");
279 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
281 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
283 /* Wait for thread to start */
284 while (jcr->SD_msg_chan == 0) {
287 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
291 extern "C" void msg_thread_cleanup(void *arg)
293 JCR *jcr = (JCR *)arg;
294 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
295 jcr->sd_msg_thread_done = true;
296 jcr->SD_msg_chan = 0;
297 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
298 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
299 free_jcr(jcr); /* release jcr */
303 * Handle the message channel (i.e. requests from the
305 * Note, we are running in a separate thread.
307 extern "C" void *msg_thread(void *arg)
309 JCR *jcr = (JCR *)arg;
312 char Job[MAX_NAME_LENGTH];
317 pthread_detach(pthread_self());
318 jcr->SD_msg_chan = pthread_self();
319 pthread_cleanup_push(msg_thread_cleanup, arg);
320 sd = jcr->store_bsock;
322 /* Read the Storage daemon's output.
324 Dmsg0(100, "Start msg_thread loop\n");
325 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
326 Dmsg1(400, "<stored: %s", sd->msg);
327 if (sscanf(sd->msg, Job_start, Job) == 1) {
330 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
332 jcr->SDJobStatus = JobStatus; /* termination status */
333 jcr->SDJobFiles = JobFiles;
334 jcr->SDJobBytes = JobBytes;
337 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
339 if (is_bnet_error(sd)) {
340 jcr->SDJobStatus = JS_ErrorTerminated;
342 pthread_cleanup_pop(1); /* remove and execute the handler */
346 void wait_for_storage_daemon_termination(JCR *jcr)
348 int cancel_count = 0;
349 /* Now wait for Storage daemon to terminate our message thread */
350 while (!jcr->sd_msg_thread_done) {
353 struct timespec timeout;
355 gettimeofday(&tv, &tz);
357 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
358 Dmsg0(400, "I'm waiting for message thread termination.\n");
360 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
362 if (job_canceled(jcr)) {
363 if (jcr->SD_msg_chan) {
364 jcr->store_bsock->timed_out = 1;
365 jcr->store_bsock->terminated = 1;
366 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
367 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
371 /* Give SD 30 seconds to clean up after cancel */
372 if (cancel_count == 6) {
376 set_jcr_job_status(jcr, JS_Terminated);
382 extern "C" void *device_thread(void *arg)
389 pthread_detach(pthread_self());
390 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
391 for (i=0; i < MAX_TRIES; i++) {
392 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
393 Dmsg0(900, "Failed connecting to SD.\n");
397 foreach_res(dev, R_DEVICE) {
398 if (!update_device_res(jcr, dev)) {
399 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
401 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
405 bnet_close(jcr->store_bsock);
406 jcr->store_bsock = NULL;
415 * Start a thread to handle getting Device resource information
416 * from SD. This is called once at startup of the Director.
418 void init_device_resources()
423 Dmsg0(100, "Start Device thread.\n");
424 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
426 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));