3 * Bacula Director -- msgchan.c -- handles the message channel
4 * to the Storage daemon and the File daemon.
6 * Kern Sibbald, August MM
8 * This routine runs as a thread and must be thread reentrant.
10 * Basic tasks done here:
11 * Open a message channel with the Storage daemon
12 * to authenticate ourself and to pass the JobId.
13 * Create a thread to interact with the Storage daemon
14 * who returns a job status and requests Catalog services, etc.
19 Copyright (C) 2000-2005 Kern Sibbald
21 This program is free software; you can redistribute it and/or
22 modify it under the terms of the GNU General Public License
23 version 2 as amended with additional clauses defined in the
24 file LICENSE in the main source directory.
26 This program is distributed in the hope that it will be useful,
27 but WITHOUT ANY WARRANTY; without even the implied warranty of
28 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 the file LICENSE for additional details.
36 /* Commands sent to Storage daemon */
37 static char jobcmd[] = "JobId=%d job=%s job_name=%s client_name=%s "
38 "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39 "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41 "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = "query device=%s";
45 /* Response from Storage daemon */
46 static char OKjob[] = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[] = "3000 OK use device device=%s\n";
49 /* Storage Daemon requests */
50 static char Job_start[] = "3010 Job %127s start\n";
51 static char Job_end[] =
52 "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
58 * Establish a message channel connection with the Storage daemon
59 * and perform authentication.
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62 int max_retry_time, int verbose)
67 if (jcr->store_bsock) {
68 return true; /* already connected */
70 store = (STORE *)jcr->storage->first();
73 * Open message channel with the Storage daemon
75 Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
77 sd = bnet_connect(jcr, retry_interval, max_retry_time,
78 _("Storage daemon"), store->address,
79 NULL, store->SDport, verbose);
83 sd->res = (RES *)store; /* save pointer to other end */
84 jcr->store_bsock = sd;
86 if (!authenticate_storage_daemon(jcr, store)) {
88 jcr->store_bsock = NULL;
95 * Here we ask the SD to send us the info for a
96 * particular device resource.
99 bool update_device_res(JCR *jcr, DEVICE *dev)
101 POOL_MEM device_name;
103 if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
106 sd = jcr->store_bsock;
107 pm_strcpy(device_name, dev->hdr.name);
108 bash_spaces(device_name);
109 bnet_fsend(sd, query_device, device_name.c_str());
110 Dmsg1(100, ">stored: %s\n", sd->msg);
111 /* The data is returned through Device_update */
112 if (bget_dirmsg(sd) <= 0) {
120 * Start a job with the Storage daemon
122 int start_storage_daemon_job(JCR *jcr, alist *store, int append)
128 POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
133 sd = jcr->store_bsock;
135 * Now send JobId and permissions, and get back the authorization key.
137 bash_spaces(jcr->job->hdr.name);
138 bash_spaces(jcr->client->hdr.name);
139 bash_spaces(jcr->fileset->hdr.name);
140 if (jcr->fileset->MD5[0] == 0) {
141 bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
143 bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
144 jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
145 jcr->fileset->hdr.name, !jcr->pool->catalog_files,
146 jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
147 jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
148 Dmsg1(100, ">stored: %s\n", sd->msg);
149 unbash_spaces(jcr->job->hdr.name);
150 unbash_spaces(jcr->client->hdr.name);
151 unbash_spaces(jcr->fileset->hdr.name);
152 if (bget_dirmsg(sd) > 0) {
153 Dmsg1(100, "<stored: %s", sd->msg);
154 if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
155 &jcr->VolSessionTime, &auth_key) != 3) {
156 Dmsg1(100, "BadJob=%s\n", sd->msg);
157 Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
160 jcr->sd_auth_key = bstrdup(auth_key);
161 Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
164 Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
169 pm_strcpy(pool_type, jcr->pool->pool_type);
170 pm_strcpy(pool_name, jcr->pool->hdr.name);
171 bash_spaces(pool_type);
172 bash_spaces(pool_name);
173 edit_int64(jcr->PoolId, PoolId);
176 * We have two loops here. The first comes from the
177 * Storage = associated with the Job, and we need
178 * to attach to each one.
179 * The inner loop loops over all the alternative devices
180 * associated with each Storage. It selects the first
183 * Note, the outer loop is not yet implemented.
185 foreach_alist(storage, store) {
186 // storage = (STORE *)store->first();
187 pm_strcpy(store_name, storage->hdr.name);
188 bash_spaces(store_name);
189 pm_strcpy(media_type, storage->media_type);
190 bash_spaces(media_type);
191 bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(),
192 pool_name.c_str(), pool_type.c_str(), append, copy, stripe);
195 /* Loop over alternative storage Devices until one is OK */
196 foreach_alist(dev, storage->device) {
197 pm_strcpy(device_name, dev->hdr.name);
198 bash_spaces(device_name);
199 bnet_fsend(sd, use_device, device_name.c_str());
200 Dmsg1(100, ">stored: %s", sd->msg);
202 bnet_sig(sd, BNET_EOD); /* end of Devices */
203 bnet_sig(sd, BNET_EOD); /* end of Storages */
204 if (bget_dirmsg(sd) > 0) {
205 Dmsg1(100, "<stored: %s", sd->msg);
206 /* ****FIXME**** save actual device name */
207 ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
210 pm_strcpy(err_msg, sd->msg); /* save message */
211 Jmsg(jcr, M_FATAL, 0, _("\n"
212 " Storage daemon didn't accept Device \"%s\" because:\n %s"),
213 device_name.c_str(), err_msg.c_str()/* sd->msg */);
218 ok = bnet_fsend(sd, "run");
219 Dmsg1(100, ">stored: %s\n", sd->msg);
225 * Start a thread to handle Storage daemon messages and
228 int start_storage_daemon_message_thread(JCR *jcr)
234 jcr->use_count++; /* mark in use by msg thread */
235 jcr->sd_msg_thread_done = false;
236 jcr->SD_msg_chan = 0;
238 Dmsg0(100, "Start SD msg_thread.\n");
239 if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
241 Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
243 Dmsg0(100, "SD msg_thread started.\n");
244 /* Wait for thread to start */
245 while (jcr->SD_msg_chan == 0) {
251 extern "C" void msg_thread_cleanup(void *arg)
253 JCR *jcr = (JCR *)arg;
254 Dmsg0(200, "End msg_thread\n");
255 db_end_transaction(jcr, jcr->db); /* terminate any open transaction */
257 jcr->sd_msg_thread_done = true;
258 pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
259 jcr->SD_msg_chan = 0;
261 free_jcr(jcr); /* release jcr */
265 * Handle the message channel (i.e. requests from the
267 * Note, we are running in a separate thread.
269 extern "C" void *msg_thread(void *arg)
271 JCR *jcr = (JCR *)arg;
274 char Job[MAX_NAME_LENGTH];
279 pthread_detach(pthread_self());
280 jcr->SD_msg_chan = pthread_self();
281 pthread_cleanup_push(msg_thread_cleanup, arg);
282 sd = jcr->store_bsock;
284 /* Read the Storage daemon's output.
286 Dmsg0(100, "Start msg_thread loop\n");
287 while ((stat=bget_dirmsg(sd)) >= 0) {
288 Dmsg1(200, "<stored: %s", sd->msg);
289 if (sscanf(sd->msg, Job_start, &Job) == 1) {
292 if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
294 jcr->SDJobStatus = JobStatus; /* termination status */
295 jcr->SDJobFiles = JobFiles;
296 jcr->SDJobBytes = JobBytes;
300 if (is_bnet_error(sd)) {
301 jcr->SDJobStatus = JS_ErrorTerminated;
303 pthread_cleanup_pop(1); /* remove and execute the handler */
307 void wait_for_storage_daemon_termination(JCR *jcr)
309 int cancel_count = 0;
310 /* Now wait for Storage daemon to terminate our message thread */
311 set_jcr_job_status(jcr, JS_WaitSD);
313 while (!jcr->sd_msg_thread_done) {
316 struct timespec timeout;
318 gettimeofday(&tv, &tz);
320 timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
321 Dmsg0(300, "I'm waiting for message thread termination.\n");
322 pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
323 if (job_canceled(jcr)) {
326 /* Give SD 30 seconds to clean up after cancel */
327 if (cancel_count == 3) {
332 set_jcr_job_status(jcr, JS_Terminated);
338 extern "C" void *device_thread(void *arg)
345 pthread_detach(pthread_self());
346 jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
347 for (i=0; i < MAX_TRIES; i++) {
348 if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
349 Dmsg0(000, "Failed connecting to SD.\n");
353 foreach_res(dev, R_DEVICE) {
354 if (!update_device_res(jcr, dev)) {
355 Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
357 Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
361 bnet_close(jcr->store_bsock);
362 jcr->store_bsock = NULL;
371 * Start a thread to handle getting Device resource information
372 * from SD. This is called once at startup of the Director.
374 void init_device_resources()
379 Dmsg0(100, "Start Device thread.\n");
380 if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
382 Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));