]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
04ac72bfeecb5ddf9b4526f6fc676421327619c4
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 /* Commands sent to Storage daemon */
37 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
38    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41    "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
44
45 /* Response from Storage daemon */
46 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[]  = "3000 OK use device device=%s\n";
48
49 /* Storage Daemon requests */
50 static char Job_start[]  = "3010 Job %127s start\n";
51 static char Job_end[]    =
52    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
53
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
56
57 /*
58  * Establish a message channel connection with the Storage daemon
59  * and perform authentication.
60  */
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62                               int max_retry_time, int verbose)
63 {
64    BSOCK *sd;
65    STORE *store;
66
67    if (jcr->store_bsock) {
68       return true;                    /* already connected */
69    }
70    store = (STORE *)jcr->storage->first();
71
72    /*
73     *  Open message channel with the Storage daemon
74     */
75    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
76       store->SDport);
77    sd = bnet_connect(jcr, retry_interval, max_retry_time,
78           _("Storage daemon"), store->address,
79           NULL, store->SDport, verbose);
80    if (sd == NULL) {
81       return false;
82    }
83    sd->res = (RES *)store;        /* save pointer to other end */
84    jcr->store_bsock = sd;
85
86    if (!authenticate_storage_daemon(jcr, store)) {
87       bnet_close(sd);
88       jcr->store_bsock = NULL;
89       return false;
90    }
91    return true;
92 }
93
94 /*
95  * Here we ask the SD to send us the info for a 
96  *  particular device resource.
97  */
98 #ifdef needed
99 bool update_device_res(JCR *jcr, DEVICE *dev)
100 {
101    POOL_MEM device_name; 
102    BSOCK *sd;
103    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
104       return false;
105    }
106    sd = jcr->store_bsock;
107    pm_strcpy(device_name, dev->hdr.name);
108    bash_spaces(device_name);
109    bnet_fsend(sd, query_device, device_name.c_str());
110    Dmsg1(100, ">stored: %s\n", sd->msg);
111    /* The data is returned through Device_update */
112    if (bget_dirmsg(sd) <= 0) {
113       return false;
114    }
115    return true;
116 }
117 #endif
118
119 /*
120  * Start a job with the Storage daemon
121  */
122 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
123 {
124    bool ok = true;
125    STORE *storage;
126    BSOCK *sd;
127    char auth_key[100];
128    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
129    int copy = 0;
130    int stripe = 0;
131
132    sd = jcr->store_bsock;
133    /*
134     * Now send JobId and permissions, and get back the authorization key.
135     */
136    bash_spaces(jcr->job->hdr.name);
137    bash_spaces(jcr->client->hdr.name);
138    bash_spaces(jcr->fileset->hdr.name);
139    if (jcr->fileset->MD5[0] == 0) {
140       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
141    }
142    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
143               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
144               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
145               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
146               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
147    Dmsg1(100, ">stored: %s\n", sd->msg);
148    unbash_spaces(jcr->job->hdr.name);
149    unbash_spaces(jcr->client->hdr.name);
150    unbash_spaces(jcr->fileset->hdr.name);
151    if (bget_dirmsg(sd) > 0) {
152        Dmsg1(100, "<stored: %s", sd->msg);
153        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
154                   &jcr->VolSessionTime, &auth_key) != 3) {
155           Dmsg1(100, "BadJob=%s\n", sd->msg);
156           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
157           return 0;
158        } else {
159           jcr->sd_auth_key = bstrdup(auth_key);
160           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
161        }
162    } else {
163       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
164          bnet_strerror(sd));
165       return 0;
166    }
167
168    pm_strcpy(pool_type, jcr->pool->pool_type);
169    pm_strcpy(pool_name, jcr->pool->hdr.name);
170    bash_spaces(pool_type);
171    bash_spaces(pool_name);
172
173    /*
174     * We have two loops here. The first comes from the 
175     *  Storage = associated with the Job, and we need 
176     *  to attach to each one.
177     * The inner loop loops over all the alternative devices
178     *  associated with each Storage. It selects the first
179     *  available one.
180     *
181     */
182    /* Do read side of storage daemon */
183    if (ok && rstore) {
184       foreach_alist(storage, rstore) {
185          pm_strcpy(store_name, storage->hdr.name);
186          bash_spaces(store_name);
187          pm_strcpy(media_type, storage->media_type);
188          bash_spaces(media_type);
189          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
190                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
191
192          DEVICE *dev;
193          /* Loop over alternative storage Devices until one is OK */
194          foreach_alist(dev, storage->device) {
195             pm_strcpy(device_name, dev->hdr.name);
196             bash_spaces(device_name);
197             bnet_fsend(sd, use_device, device_name.c_str());
198             Dmsg1(100, ">stored: %s", sd->msg);
199          }
200          bnet_sig(sd, BNET_EOD);            /* end of Devices */
201          bnet_sig(sd, BNET_EOD);            /* end of Storages */
202          if (bget_dirmsg(sd) > 0) {
203             Dmsg1(100, "<stored: %s", sd->msg);
204             /* ****FIXME**** save actual device name */
205             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
206          } else {
207             POOL_MEM err_msg;
208             pm_strcpy(err_msg, sd->msg); /* save message */
209             Jmsg(jcr, M_FATAL, 0, _("\n"
210                "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
211                device_name.c_str(), err_msg.c_str()/* sd->msg */);
212             ok = false;
213          }
214          break;
215       }
216    }
217
218    /* Do write side of storage daemon */
219    if (ok && wstore) {
220       foreach_alist(storage, wstore) {
221          pm_strcpy(store_name, storage->hdr.name);
222          bash_spaces(store_name);
223          pm_strcpy(media_type, storage->media_type);
224          bash_spaces(media_type);
225          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
226                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
227
228          DEVICE *dev;
229          /* Loop over alternative storage Devices until one is OK */
230          foreach_alist(dev, storage->device) {
231             pm_strcpy(device_name, dev->hdr.name);
232             bash_spaces(device_name);
233             bnet_fsend(sd, use_device, device_name.c_str());
234             Dmsg1(100, ">stored: %s", sd->msg);
235          }
236          bnet_sig(sd, BNET_EOD);            /* end of Devices */
237          bnet_sig(sd, BNET_EOD);            /* end of Storages */
238          if (bget_dirmsg(sd) > 0) {
239             Dmsg1(100, "<stored: %s", sd->msg);
240             /* ****FIXME**** save actual device name */
241             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
242          } else {
243             POOL_MEM err_msg;
244             pm_strcpy(err_msg, sd->msg); /* save message */
245             Jmsg(jcr, M_FATAL, 0, _("\n"
246                "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
247                device_name.c_str(), err_msg.c_str()/* sd->msg */);
248             ok = false;
249          }
250          break;
251       }
252    }
253    return ok;
254 }
255
256 /*
257  * Start a thread to handle Storage daemon messages and
258  *  Catalog requests.
259  */
260 int start_storage_daemon_message_thread(JCR *jcr)
261 {
262    int status;
263    pthread_t thid;
264
265    P(jcr->mutex);
266    jcr->use_count++;                  /* mark in use by msg thread */
267    jcr->sd_msg_thread_done = false;
268    jcr->SD_msg_chan = 0;
269    V(jcr->mutex);
270    Dmsg0(100, "Start SD msg_thread.\n");
271    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
272       berrno be;
273       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
274    }
275    Dmsg0(100, "SD msg_thread started.\n");
276    /* Wait for thread to start */
277    while (jcr->SD_msg_chan == 0) {
278       bmicrosleep(0, 50);
279    }
280    return 1;
281 }
282
283 extern "C" void msg_thread_cleanup(void *arg)
284 {
285    JCR *jcr = (JCR *)arg;
286    Dmsg0(200, "End msg_thread\n");
287    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
288    P(jcr->mutex);
289    jcr->sd_msg_thread_done = true;
290    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
291    jcr->SD_msg_chan = 0;
292    V(jcr->mutex);
293    free_jcr(jcr);                     /* release jcr */
294 }
295
296 /*
297  * Handle the message channel (i.e. requests from the
298  *  Storage daemon).
299  * Note, we are running in a separate thread.
300  */
301 extern "C" void *msg_thread(void *arg)
302 {
303    JCR *jcr = (JCR *)arg;
304    BSOCK *sd;
305    int JobStatus;
306    char Job[MAX_NAME_LENGTH];
307    uint32_t JobFiles;
308    uint64_t JobBytes;
309    int stat;
310
311    pthread_detach(pthread_self());
312    jcr->SD_msg_chan = pthread_self();
313    pthread_cleanup_push(msg_thread_cleanup, arg);
314    sd = jcr->store_bsock;
315
316    /* Read the Storage daemon's output.
317     */
318    Dmsg0(100, "Start msg_thread loop\n");
319    while ((stat=bget_dirmsg(sd)) >= 0) {
320       Dmsg1(200, "<stored: %s", sd->msg);
321       if (sscanf(sd->msg, Job_start, &Job) == 1) {
322          continue;
323       }
324       if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
325                  &JobBytes) == 4) {
326          jcr->SDJobStatus = JobStatus; /* termination status */
327          jcr->SDJobFiles = JobFiles;
328          jcr->SDJobBytes = JobBytes;
329          break;
330       }
331    }
332    if (is_bnet_error(sd)) {
333       jcr->SDJobStatus = JS_ErrorTerminated;
334    }
335    pthread_cleanup_pop(1);            /* remove and execute the handler */
336    return NULL;
337 }
338
339 void wait_for_storage_daemon_termination(JCR *jcr)
340 {
341    int cancel_count = 0;
342    /* Now wait for Storage daemon to terminate our message thread */
343    set_jcr_job_status(jcr, JS_WaitSD);
344    P(jcr->mutex);
345    while (!jcr->sd_msg_thread_done) {
346       struct timeval tv;
347       struct timezone tz;
348       struct timespec timeout;
349
350       gettimeofday(&tv, &tz);
351       timeout.tv_nsec = 0;
352       timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
353       Dmsg0(300, "I'm waiting for message thread termination.\n");
354       pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
355       if (job_canceled(jcr)) {
356          cancel_count++;
357       }
358       /* Give SD 30 seconds to clean up after cancel */
359       if (cancel_count == 3) {
360          break;
361       }
362    }
363    V(jcr->mutex);
364    set_jcr_job_status(jcr, JS_Terminated);
365 }
366
367 #ifdef needed
368 #define MAX_TRIES 30
369 #define WAIT_TIME 2
370 extern "C" void *device_thread(void *arg)
371 {
372    int i;
373    JCR *jcr;
374    DEVICE *dev;
375
376
377    pthread_detach(pthread_self());
378    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
379    for (i=0; i < MAX_TRIES; i++) {
380       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
381          Dmsg0(000, "Failed connecting to SD.\n");
382          continue;
383       }
384       LockRes();
385       foreach_res(dev, R_DEVICE) {
386          if (!update_device_res(jcr, dev)) {
387             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
388          } else {
389             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
390          }
391       }
392       UnlockRes();
393       bnet_close(jcr->store_bsock);
394       jcr->store_bsock = NULL;
395       break;
396
397    }
398    free_jcr(jcr);
399    return NULL;
400 }
401
402 /*
403  * Start a thread to handle getting Device resource information
404  *  from SD. This is called once at startup of the Director.
405  */
406 void init_device_resources()
407 {
408    int status;
409    pthread_t thid;
410
411    Dmsg0(100, "Start Device thread.\n");
412    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
413       berrno be;
414       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
415    }
416 }
417 #endif