3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2006 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38 /* Commands sent to Storage daemon */
39 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
40 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43 "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
47 /* Response from Storage daemon */
48 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[] = "3000 OK use device device=%s\n";
51 /* Storage Daemon requests */
52 static char Job_start[] = "3010 Job %127s start\n";
53 static char Job_end[] =
54 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
60 * Establish a message channel connection with the Storage daemon
61 * and perform authentication.
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64 int max_retry_time, int verbose)
69 if (jcr->store_bsock) {
70 return true; /* already connected */
72 store = (STORE *)jcr->storage->first();
75 * Open message channel with the Storage daemon
77 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
79 sd = bnet_connect(jcr, retry_interval, max_retry_time,
80 _("Storage daemon"), store->address,
81 NULL, store->SDport, verbose);
85 sd->res = (RES *)store; /* save pointer to other end */
86 jcr->store_bsock = sd;
88 if (!authenticate_storage_daemon(jcr, store)) {
90 jcr->store_bsock = NULL;
97 * Here we ask the SD to send us the info for a
98 * particular device resource.
101 bool update_device_res(JCR *jcr, DEVICE *dev)
103 POOL_MEM device_name;
105 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
108 sd = jcr->store_bsock;
109 pm_strcpy(device_name, dev->hdr.name);
110 bash_spaces(device_name);
111 bnet_fsend(sd, query_device, device_name.c_str());
112 Dmsg1(100, ">stored: %s\n", sd->msg);
113 /* The data is returned through Device_update */
114 if (bget_dirmsg(sd) <= 0) {
122 * Start a job with the Storage daemon
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
130 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
131 POOL_MEM job_name, client_name, fileset_name;
136 sd = jcr->store_bsock;
138 * Now send JobId and permissions, and get back the authorization key.
140 pm_strcpy(job_name, jcr->job->hdr.name);
141 bash_spaces(job_name);
142 pm_strcpy(client_name, jcr->client->hdr.name);
143 bash_spaces(client_name);
144 pm_strcpy(fileset_name, jcr->fileset->hdr.name);
145 bash_spaces(fileset_name);
146 if (jcr->fileset->MD5[0] == 0) {
147 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
149 /* If rescheduling, cancel the previous incarnation of this job
150 * with the SD, which might be waiting on the FD connection.
151 * If we do not cancel it the SD will not accept a new connection
152 * for the same jobid.
154 if (jcr->reschedule_count) {
155 bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
156 while (bnet_recv(sd) >= 0)
159 bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
160 job_name.c_str(), client_name.c_str(),
161 jcr->JobType, jcr->JobLevel,
162 fileset_name.c_str(), !jcr->pool->catalog_files,
163 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
164 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
165 Dmsg1(100, ">stored: %s\n", sd->msg);
166 if (bget_dirmsg(sd) > 0) {
167 Dmsg1(100, "<stored: %s", sd->msg);
168 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
169 &jcr->VolSessionTime, &auth_key) != 3) {
170 Dmsg1(100, "BadJob=%s\n", sd->msg);
171 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
174 jcr->sd_auth_key = bstrdup(auth_key);
175 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
178 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
183 pm_strcpy(pool_type, jcr->pool->pool_type);
184 pm_strcpy(pool_name, jcr->pool->hdr.name);
185 bash_spaces(pool_type);
186 bash_spaces(pool_name);
189 * We have two loops here. The first comes from the
190 * Storage = associated with the Job, and we need
191 * to attach to each one.
192 * The inner loop loops over all the alternative devices
193 * associated with each Storage. It selects the first
197 /* Do read side of storage daemon */
199 foreach_alist(storage, rstore) {
200 pm_strcpy(store_name, storage->hdr.name);
201 bash_spaces(store_name);
202 pm_strcpy(media_type, storage->media_type);
203 bash_spaces(media_type);
204 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
205 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
208 /* Loop over alternative storage Devices until one is OK */
209 foreach_alist(dev, storage->device) {
210 pm_strcpy(device_name, dev->hdr.name);
211 bash_spaces(device_name);
212 bnet_fsend(sd, use_device, device_name.c_str());
213 Dmsg1(100, ">stored: %s", sd->msg);
215 bnet_sig(sd, BNET_EOD); /* end of Devices */
217 bnet_sig(sd, BNET_EOD); /* end of Storages */
218 if (bget_dirmsg(sd) > 0) {
219 Dmsg1(100, "<stored: %s", sd->msg);
220 /* ****FIXME**** save actual device name */
221 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
227 /* Do write side of storage daemon */
229 foreach_alist(storage, wstore) {
230 pm_strcpy(store_name, storage->hdr.name);
231 bash_spaces(store_name);
232 pm_strcpy(media_type, storage->media_type);
233 bash_spaces(media_type);
234 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
235 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
238 /* Loop over alternative storage Devices until one is OK */
239 foreach_alist(dev, storage->device) {
240 pm_strcpy(device_name, dev->hdr.name);
241 bash_spaces(device_name);
242 bnet_fsend(sd, use_device, device_name.c_str());
243 Dmsg1(100, ">stored: %s", sd->msg);
245 bnet_sig(sd, BNET_EOD); /* end of Devices */
247 bnet_sig(sd, BNET_EOD); /* end of Storages */
248 if (bget_dirmsg(sd) > 0) {
249 Dmsg1(100, "<stored: %s", sd->msg);
250 /* ****FIXME**** save actual device name */
251 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
259 pm_strcpy(err_msg, sd->msg); /* save message */
260 Jmsg(jcr, M_FATAL, 0, _("\n"
261 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
262 device_name.c_str(), err_msg.c_str()/* sd->msg */);
264 Jmsg(jcr, M_FATAL, 0, _("\n"
265 " Storage daemon didn't accept Device \"%s\" command.\n"),
266 device_name.c_str());
273 * Start a thread to handle Storage daemon messages and
276 int start_storage_daemon_message_thread(JCR *jcr)
281 jcr->inc_use_count(); /* mark in use by msg thread */
282 jcr->sd_msg_thread_done = false;
283 jcr->SD_msg_chan = 0;
284 Dmsg0(100, "Start SD msg_thread.\n");
285 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
287 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
289 /* Wait for thread to start */
290 while (jcr->SD_msg_chan == 0) {
293 Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
297 extern "C" void msg_thread_cleanup(void *arg)
299 JCR *jcr = (JCR *)arg;
300 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
301 jcr->sd_msg_thread_done = true;
302 jcr->SD_msg_chan = 0;
303 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
304 Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
305 free_jcr(jcr); /* release jcr */
309 * Handle the message channel (i.e. requests from the
311 * Note, we are running in a separate thread.
313 extern "C" void *msg_thread(void *arg)
315 JCR *jcr = (JCR *)arg;
318 char Job[MAX_NAME_LENGTH];
323 pthread_detach(pthread_self());
324 jcr->SD_msg_chan = pthread_self();
325 pthread_cleanup_push(msg_thread_cleanup, arg);
326 sd = jcr->store_bsock;
328 /* Read the Storage daemon's output.
330 Dmsg0(100, "Start msg_thread loop\n");
331 while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
332 Dmsg1(400, "<stored: %s", sd->msg);
333 if (sscanf(sd->msg, Job_start, Job) == 1) {
336 if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
338 jcr->SDJobStatus = JobStatus; /* termination status */
339 jcr->SDJobFiles = JobFiles;
340 jcr->SDJobBytes = JobBytes;
343 Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
345 if (is_bnet_error(sd)) {
346 jcr->SDJobStatus = JS_ErrorTerminated;
348 pthread_cleanup_pop(1); /* remove and execute the handler */
352 void wait_for_storage_daemon_termination(JCR *jcr)
354 int cancel_count = 0;
355 /* Now wait for Storage daemon to terminate our message thread */
356 while (!jcr->sd_msg_thread_done) {
359 struct timespec timeout;
361 gettimeofday(&tv, &tz);
363 timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
364 Dmsg0(400, "I'm waiting for message thread termination.\n");
366 pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
368 if (job_canceled(jcr)) {
369 if (jcr->SD_msg_chan) {
370 jcr->store_bsock->timed_out = 1;
371 jcr->store_bsock->terminated = 1;
372 Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
373 pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
377 /* Give SD 30 seconds to clean up after cancel */
378 if (cancel_count == 6) {
382 set_jcr_job_status(jcr, JS_Terminated);
388 extern "C" void *device_thread(void *arg)
395 pthread_detach(pthread_self());
396 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
397 for (i=0; i < MAX_TRIES; i++) {
398 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
399 Dmsg0(900, "Failed connecting to SD.\n");
403 foreach_res(dev, R_DEVICE) {
404 if (!update_device_res(jcr, dev)) {
405 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
407 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
411 bnet_close(jcr->store_bsock);
412 jcr->store_bsock = NULL;
421 * Start a thread to handle getting Device resource information
422 * from SD. This is called once at startup of the Director.
424 void init_device_resources()
429 Dmsg0(100, "Start Device thread.\n");
430 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
432 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));