3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2005 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 /* Commands sent to Storage daemon */
37 static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s "
38 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41 "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
45 /* Response from Storage daemon */
46 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[] = "3000 OK use device device=%s\n";
49 /* Storage Daemon requests */
50 static char Job_start[] = "3010 Job %127s start\n";
51 static char Job_end[] =
52 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
58 * Establish a message channel connection with the Storage daemon
59 * and perform authentication.
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62 int max_retry_time, int verbose)
67 if (jcr->store_bsock) {
68 return true; /* already connected */
70 store = (STORE *)jcr->storage->first();
73 * Open message channel with the Storage daemon
75 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
77 sd = bnet_connect(jcr, retry_interval, max_retry_time,
78 _("Storage daemon"), store->address,
79 NULL, store->SDport, verbose);
83 sd->res = (RES *)store; /* save pointer to other end */
84 jcr->store_bsock = sd;
86 if (!authenticate_storage_daemon(jcr, store)) {
88 jcr->store_bsock = NULL;
95 * Here we ask the SD to send us the info for a
96 * particular device resource.
99 bool update_device_res(JCR *jcr, DEVICE *dev)
101 POOL_MEM device_name;
103 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
106 sd = jcr->store_bsock;
107 pm_strcpy(device_name, dev->hdr.name);
108 bash_spaces(device_name);
109 bnet_fsend(sd, query_device, device_name.c_str());
110 Dmsg1(100, ">stored: %s\n", sd->msg);
111 /* The data is returned through Device_update */
112 if (bget_dirmsg(sd) <= 0) {
120 * Start a job with the Storage daemon
122 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
128 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
132 sd = jcr->store_bsock;
134 * Now send JobId and permissions, and get back the authorization key.
136 bash_spaces(jcr->job->hdr.name);
137 bash_spaces(jcr->client->hdr.name);
138 bash_spaces(jcr->fileset->hdr.name);
139 if (jcr->fileset->MD5[0] == 0) {
140 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
142 bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
143 jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
144 jcr->fileset->hdr.name, !jcr->pool->catalog_files,
145 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
146 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
147 Dmsg1(100, ">stored: %s\n", sd->msg);
148 unbash_spaces(jcr->job->hdr.name);
149 unbash_spaces(jcr->client->hdr.name);
150 unbash_spaces(jcr->fileset->hdr.name);
151 if (bget_dirmsg(sd) > 0) {
152 Dmsg1(100, "<stored: %s", sd->msg);
153 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
154 &jcr->VolSessionTime, &auth_key) != 3) {
155 Dmsg1(100, "BadJob=%s\n", sd->msg);
156 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
159 jcr->sd_auth_key = bstrdup(auth_key);
160 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
163 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
168 pm_strcpy(pool_type, jcr->pool->pool_type);
169 pm_strcpy(pool_name, jcr->pool->hdr.name);
170 bash_spaces(pool_type);
171 bash_spaces(pool_name);
174 * We have two loops here. The first comes from the
175 * Storage = associated with the Job, and we need
176 * to attach to each one.
177 * The inner loop loops over all the alternative devices
178 * associated with each Storage. It selects the first
182 /* Do read side of storage daemon */
184 foreach_alist(storage, rstore) {
185 pm_strcpy(store_name, storage->hdr.name);
186 bash_spaces(store_name);
187 pm_strcpy(media_type, storage->media_type);
188 bash_spaces(media_type);
189 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
190 pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
193 /* Loop over alternative storage Devices until one is OK */
194 foreach_alist(dev, storage->device) {
195 pm_strcpy(device_name, dev->hdr.name);
196 bash_spaces(device_name);
197 bnet_fsend(sd, use_device, device_name.c_str());
198 Dmsg1(100, ">stored: %s", sd->msg);
200 bnet_sig(sd, BNET_EOD); /* end of Devices */
201 bnet_sig(sd, BNET_EOD); /* end of Storages */
202 if (bget_dirmsg(sd) > 0) {
203 Dmsg1(100, "<stored: %s", sd->msg);
204 /* ****FIXME**** save actual device name */
205 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
208 pm_strcpy(err_msg, sd->msg); /* save message */
209 Jmsg(jcr, M_FATAL, 0, _("\n"
210 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
211 device_name.c_str(), err_msg.c_str()/* sd->msg */);
218 /* Do write side of storage daemon */
220 foreach_alist(storage, wstore) {
221 pm_strcpy(store_name, storage->hdr.name);
222 bash_spaces(store_name);
223 pm_strcpy(media_type, storage->media_type);
224 bash_spaces(media_type);
225 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
226 pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
229 /* Loop over alternative storage Devices until one is OK */
230 foreach_alist(dev, storage->device) {
231 pm_strcpy(device_name, dev->hdr.name);
232 bash_spaces(device_name);
233 bnet_fsend(sd, use_device, device_name.c_str());
234 Dmsg1(100, ">stored: %s", sd->msg);
236 bnet_sig(sd, BNET_EOD); /* end of Devices */
237 bnet_sig(sd, BNET_EOD); /* end of Storages */
238 if (bget_dirmsg(sd) > 0) {
239 Dmsg1(100, "<stored: %s", sd->msg);
240 /* ****FIXME**** save actual device name */
241 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
244 pm_strcpy(err_msg, sd->msg); /* save message */
245 Jmsg(jcr, M_FATAL, 0, _("\n"
246 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
247 device_name.c_str(), err_msg.c_str()/* sd->msg */);
254 ok = bnet_fsend(sd, "run");
255 Dmsg1(100, ">stored: %s\n", sd->msg);
261 * Start a thread to handle Storage daemon messages and
264 int start_storage_daemon_message_thread(JCR *jcr)
270 jcr->use_count++; /* mark in use by msg thread */
271 jcr->sd_msg_thread_done = false;
272 jcr->SD_msg_chan = 0;
274 Dmsg0(100, "Start SD msg_thread.\n");
275 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
277 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
279 Dmsg0(100, "SD msg_thread started.\n");
280 /* Wait for thread to start */
281 while (jcr->SD_msg_chan == 0) {
287 extern "C" void msg_thread_cleanup(void *arg)
289 JCR *jcr = (JCR *)arg;
290 Dmsg0(200, "End msg_thread\n");
291 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
293 jcr->sd_msg_thread_done = true;
294 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
295 jcr->SD_msg_chan = 0;
297 free_jcr(jcr); /* release jcr */
301 * Handle the message channel (i.e. requests from the
303 * Note, we are running in a separate thread.
305 extern "C" void *msg_thread(void *arg)
307 JCR *jcr = (JCR *)arg;
310 char Job[MAX_NAME_LENGTH];
315 pthread_detach(pthread_self());
316 jcr->SD_msg_chan = pthread_self();
317 pthread_cleanup_push(msg_thread_cleanup, arg);
318 sd = jcr->store_bsock;
320 /* Read the Storage daemon's output.
322 Dmsg0(100, "Start msg_thread loop\n");
323 while ((stat=bget_dirmsg(sd)) >= 0) {
324 Dmsg1(200, "<stored: %s", sd->msg);
325 if (sscanf(sd->msg, Job_start, &Job) == 1) {
328 if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
330 jcr->SDJobStatus = JobStatus; /* termination status */
331 jcr->SDJobFiles = JobFiles;
332 jcr->SDJobBytes = JobBytes;
336 if (is_bnet_error(sd)) {
337 jcr->SDJobStatus = JS_ErrorTerminated;
339 pthread_cleanup_pop(1); /* remove and execute the handler */
343 void wait_for_storage_daemon_termination(JCR *jcr)
345 int cancel_count = 0;
346 /* Now wait for Storage daemon to terminate our message thread */
347 set_jcr_job_status(jcr, JS_WaitSD);
349 while (!jcr->sd_msg_thread_done) {
352 struct timespec timeout;
354 gettimeofday(&tv, &tz);
356 timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
357 Dmsg0(300, "I'm waiting for message thread termination.\n");
358 pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
359 if (job_canceled(jcr)) {
362 /* Give SD 30 seconds to clean up after cancel */
363 if (cancel_count == 3) {
368 set_jcr_job_status(jcr, JS_Terminated);
374 extern "C" void *device_thread(void *arg)
381 pthread_detach(pthread_self());
382 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
383 for (i=0; i < MAX_TRIES; i++) {
384 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
385 Dmsg0(000, "Failed connecting to SD.\n");
389 foreach_res(dev, R_DEVICE) {
390 if (!update_device_res(jcr, dev)) {
391 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
393 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
397 bnet_close(jcr->store_bsock);
398 jcr->store_bsock = NULL;
407 * Start a thread to handle getting Device resource information
408 * from SD. This is called once at startup of the Director.
410 void init_device_resources()
415 Dmsg0(100, "Start Device thread.\n");
416 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
418 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));