3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2006 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s "
40 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43 "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
47 /* Response from Storage daemon */
48 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[] = "3000 OK use device device=%s\n";
51 /* Storage Daemon requests */
52 static char Job_start[] = "3010 Job %127s start\n";
53 static char Job_end[] =
54 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
60 * Establish a message channel connection with the Storage daemon
61 * and perform authentication.
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64 int max_retry_time, int verbose)
69 if (jcr->store_bsock) {
70 return true; /* already connected */
72 store = (STORE *)jcr->storage->first();
75 * Open message channel with the Storage daemon
77 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
79 sd = bnet_connect(jcr, retry_interval, max_retry_time,
80 _("Storage daemon"), store->address,
81 NULL, store->SDport, verbose);
85 sd->res = (RES *)store; /* save pointer to other end */
86 jcr->store_bsock = sd;
88 if (!authenticate_storage_daemon(jcr, store)) {
90 jcr->store_bsock = NULL;
97 * Here we ask the SD to send us the info for a
98 * particular device resource.
101 bool update_device_res(JCR *jcr, DEVICE *dev)
103 POOL_MEM device_name;
105 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
108 sd = jcr->store_bsock;
109 pm_strcpy(device_name, dev->hdr.name);
110 bash_spaces(device_name);
111 bnet_fsend(sd, query_device, device_name.c_str());
112 Dmsg1(100, ">stored: %s\n", sd->msg);
113 /* The data is returned through Device_update */
114 if (bget_dirmsg(sd) <= 0) {
122 * Start a job with the Storage daemon
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
130 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
134 sd = jcr->store_bsock;
136 * Now send JobId and permissions, and get back the authorization key.
138 bash_spaces(jcr->job->hdr.name);
139 bash_spaces(jcr->client->hdr.name);
140 bash_spaces(jcr->fileset->hdr.name);
141 if (jcr->fileset->MD5[0] == 0) {
142 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
144 bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
145 jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
146 jcr->fileset->hdr.name, !jcr->pool->catalog_files,
147 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
148 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
149 Dmsg1(100, ">stored: %s\n", sd->msg);
150 unbash_spaces(jcr->job->hdr.name);
151 unbash_spaces(jcr->client->hdr.name);
152 unbash_spaces(jcr->fileset->hdr.name);
153 if (bget_dirmsg(sd) > 0) {
154 Dmsg1(100, "<stored: %s", sd->msg);
155 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
156 &jcr->VolSessionTime, &auth_key) != 3) {
157 Dmsg1(100, "BadJob=%s\n", sd->msg);
158 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
161 jcr->sd_auth_key = bstrdup(auth_key);
162 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
165 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
170 pm_strcpy(pool_type, jcr->pool->pool_type);
171 pm_strcpy(pool_name, jcr->pool->hdr.name);
172 bash_spaces(pool_type);
173 bash_spaces(pool_name);
176 * We have two loops here. The first comes from the
177 * Storage = associated with the Job, and we need
178 * to attach to each one.
179 * The inner loop loops over all the alternative devices
180 * associated with each Storage. It selects the first
184 /* Do read side of storage daemon */
186 foreach_alist(storage, rstore) {
187 pm_strcpy(store_name, storage->hdr.name);
188 bash_spaces(store_name);
189 pm_strcpy(media_type, storage->media_type);
190 bash_spaces(media_type);
191 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
192 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
195 /* Loop over alternative storage Devices until one is OK */
196 foreach_alist(dev, storage->device) {
197 pm_strcpy(device_name, dev->hdr.name);
198 bash_spaces(device_name);
199 bnet_fsend(sd, use_device, device_name.c_str());
200 Dmsg1(100, ">stored: %s", sd->msg);
202 bnet_sig(sd, BNET_EOD); /* end of Devices */
204 bnet_sig(sd, BNET_EOD); /* end of Storages */
205 if (bget_dirmsg(sd) > 0) {
206 Dmsg1(100, "<stored: %s", sd->msg);
207 /* ****FIXME**** save actual device name */
208 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
211 pm_strcpy(err_msg, sd->msg); /* save message */
212 Jmsg(jcr, M_FATAL, 0, _("\n"
213 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
214 device_name.c_str(), err_msg.c_str()/* sd->msg */);
219 /* Do write side of storage daemon */
221 foreach_alist(storage, wstore) {
222 pm_strcpy(store_name, storage->hdr.name);
223 bash_spaces(store_name);
224 pm_strcpy(media_type, storage->media_type);
225 bash_spaces(media_type);
226 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
227 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
230 /* Loop over alternative storage Devices until one is OK */
231 foreach_alist(dev, storage->device) {
232 pm_strcpy(device_name, dev->hdr.name);
233 bash_spaces(device_name);
234 bnet_fsend(sd, use_device, device_name.c_str());
235 Dmsg1(100, ">stored: %s", sd->msg);
237 bnet_sig(sd, BNET_EOD); /* end of Devices */
239 bnet_sig(sd, BNET_EOD); /* end of Storages */
240 if (bget_dirmsg(sd) > 0) {
241 Dmsg1(100, "<stored: %s", sd->msg);
242 /* ****FIXME**** save actual device name */
243 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
246 pm_strcpy(err_msg, sd->msg); /* save message */
247 Jmsg(jcr, M_FATAL, 0, _("\n"
248 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
249 device_name.c_str(), err_msg.c_str()/* sd->msg */);
257 * Start a thread to handle Storage daemon messages and
260 int start_storage_daemon_message_thread(JCR *jcr)
265 jcr->inc_use_count(); /* mark in use by msg thread */
266 jcr->sd_msg_thread_done = false;
267 jcr->SD_msg_chan = 0;
268 Dmsg0(100, "Start SD msg_thread.\n");
269 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
271 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
273 /* Wait for thread to start */
274 while (jcr->SD_msg_chan == 0) {
277 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
281 extern "C" void msg_thread_cleanup(void *arg)
283 JCR *jcr = (JCR *)arg;
284 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
285 jcr->sd_msg_thread_done = true;
286 jcr->SD_msg_chan = 0;
287 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
288 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
289 free_jcr(jcr); /* release jcr */
293 * Handle the message channel (i.e. requests from the
295 * Note, we are running in a separate thread.
297 extern "C" void *msg_thread(void *arg)
299 JCR *jcr = (JCR *)arg;
302 char Job[MAX_NAME_LENGTH];
307 pthread_detach(pthread_self());
308 jcr->SD_msg_chan = pthread_self();
309 pthread_cleanup_push(msg_thread_cleanup, arg);
310 sd = jcr->store_bsock;
312 /* Read the Storage daemon's output.
314 Dmsg0(100, "Start msg_thread loop\n");
315 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
316 Dmsg1(400, "<stored: %s", sd->msg);
317 if (sscanf(sd->msg, Job_start, Job) == 1) {
320 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
322 jcr->SDJobStatus = JobStatus; /* termination status */
323 jcr->SDJobFiles = JobFiles;
324 jcr->SDJobBytes = JobBytes;
327 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
329 if (is_bnet_error(sd)) {
330 jcr->SDJobStatus = JS_ErrorTerminated;
332 pthread_cleanup_pop(1); /* remove and execute the handler */
336 void wait_for_storage_daemon_termination(JCR *jcr)
338 int cancel_count = 0;
339 /* Now wait for Storage daemon to terminate our message thread */
340 while (!jcr->sd_msg_thread_done) {
343 struct timespec timeout;
345 gettimeofday(&tv, &tz);
347 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
348 Dmsg0(400, "I'm waiting for message thread termination.\n");
350 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
352 if (job_canceled(jcr)) {
353 if (jcr->SD_msg_chan) {
354 jcr->store_bsock->timed_out = 1;
355 jcr->store_bsock->terminated = 1;
356 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
357 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
361 /* Give SD 30 seconds to clean up after cancel */
362 if (cancel_count == 6) {
366 set_jcr_job_status(jcr, JS_Terminated);
372 extern "C" void *device_thread(void *arg)
379 pthread_detach(pthread_self());
380 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
381 for (i=0; i < MAX_TRIES; i++) {
382 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
383 Dmsg0(900, "Failed connecting to SD.\n");
387 foreach_res(dev, R_DEVICE) {
388 if (!update_device_res(jcr, dev)) {
389 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
391 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
395 bnet_close(jcr->store_bsock);
396 jcr->store_bsock = NULL;
405 * Start a thread to handle getting Device resource information
406 * from SD. This is called once at startup of the Director.
408 void init_device_resources()
413 Dmsg0(100, "Start Device thread.\n");
414 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
416 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));