]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
- Fix the block listing bug pointed out by Arno.
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 /* Commands sent to Storage daemon */
37 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
38    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41    "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
44
45 /* Response from Storage daemon */
46 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[]  = "3000 OK use device device=%s\n";
48
49 /* Storage Daemon requests */
50 static char Job_start[]  = "3010 Job %127s start\n";
51 static char Job_end[]    =
52    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
53
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
56
57 /*
58  * Establish a message channel connection with the Storage daemon
59  * and perform authentication.
60  */
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62                               int max_retry_time, int verbose)
63 {
64    BSOCK *sd;
65    STORE *store;
66
67    if (jcr->store_bsock) {
68       return true;                    /* already connected */
69    }
70    store = (STORE *)jcr->storage->first();
71
72    /*
73     *  Open message channel with the Storage daemon
74     */
75    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
76       store->SDport);
77    sd = bnet_connect(jcr, retry_interval, max_retry_time,
78           _("Storage daemon"), store->address,
79           NULL, store->SDport, verbose);
80    if (sd == NULL) {
81       return false;
82    }
83    sd->res = (RES *)store;        /* save pointer to other end */
84    jcr->store_bsock = sd;
85
86    if (!authenticate_storage_daemon(jcr, store)) {
87       bnet_close(sd);
88       jcr->store_bsock = NULL;
89       return false;
90    }
91    return true;
92 }
93
94 /*
95  * Here we ask the SD to send us the info for a 
96  *  particular device resource.
97  */
98 #ifdef needed
99 bool update_device_res(JCR *jcr, DEVICE *dev)
100 {
101    POOL_MEM device_name; 
102    BSOCK *sd;
103    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
104       return false;
105    }
106    sd = jcr->store_bsock;
107    pm_strcpy(device_name, dev->hdr.name);
108    bash_spaces(device_name);
109    bnet_fsend(sd, query_device, device_name.c_str());
110    Dmsg1(100, ">stored: %s\n", sd->msg);
111    /* The data is returned through Device_update */
112    if (bget_dirmsg(sd) <= 0) {
113       return false;
114    }
115    return true;
116 }
117 #endif
118
119 /*
120  * Start a job with the Storage daemon
121  */
122 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
123 {
124    bool ok = true;
125    STORE *storage;
126    BSOCK *sd;
127    char auth_key[100];
128    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
129    int copy = 0;
130    int stripe = 0;
131
132    sd = jcr->store_bsock;
133    /*
134     * Now send JobId and permissions, and get back the authorization key.
135     */
136    bash_spaces(jcr->job->hdr.name);
137    bash_spaces(jcr->client->hdr.name);
138    bash_spaces(jcr->fileset->hdr.name);
139    if (jcr->fileset->MD5[0] == 0) {
140       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
141    }
142    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
143               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
144               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
145               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
146               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
147    Dmsg1(100, ">stored: %s\n", sd->msg);
148    unbash_spaces(jcr->job->hdr.name);
149    unbash_spaces(jcr->client->hdr.name);
150    unbash_spaces(jcr->fileset->hdr.name);
151    if (bget_dirmsg(sd) > 0) {
152        Dmsg1(100, "<stored: %s", sd->msg);
153        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
154                   &jcr->VolSessionTime, &auth_key) != 3) {
155           Dmsg1(100, "BadJob=%s\n", sd->msg);
156           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
157           return 0;
158        } else {
159           jcr->sd_auth_key = bstrdup(auth_key);
160           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
161        }
162    } else {
163       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
164          bnet_strerror(sd));
165       return 0;
166    }
167
168    pm_strcpy(pool_type, jcr->pool->pool_type);
169    pm_strcpy(pool_name, jcr->pool->hdr.name);
170    bash_spaces(pool_type);
171    bash_spaces(pool_name);
172
173    /*
174     * We have two loops here. The first comes from the 
175     *  Storage = associated with the Job, and we need 
176     *  to attach to each one.
177     * The inner loop loops over all the alternative devices
178     *  associated with each Storage. It selects the first
179     *  available one.
180     *
181     */
182    /* Do read side of storage daemon */
183    if (ok && rstore) {
184       foreach_alist(storage, rstore) {
185          pm_strcpy(store_name, storage->hdr.name);
186          bash_spaces(store_name);
187          pm_strcpy(media_type, storage->media_type);
188          bash_spaces(media_type);
189          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
190                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
191
192          DEVICE *dev;
193          /* Loop over alternative storage Devices until one is OK */
194          foreach_alist(dev, storage->device) {
195             pm_strcpy(device_name, dev->hdr.name);
196             bash_spaces(device_name);
197             bnet_fsend(sd, use_device, device_name.c_str());
198             Dmsg1(100, ">stored: %s", sd->msg);
199          }
200          bnet_sig(sd, BNET_EOD);            /* end of Devices */
201       }
202       bnet_sig(sd, BNET_EOD);            /* end of Storages */
203       if (bget_dirmsg(sd) > 0) {
204          Dmsg1(100, "<stored: %s", sd->msg);
205          /* ****FIXME**** save actual device name */
206          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
207       } else {
208          POOL_MEM err_msg;
209          pm_strcpy(err_msg, sd->msg); /* save message */
210          Jmsg(jcr, M_FATAL, 0, _("\n"
211             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
212             device_name.c_str(), err_msg.c_str()/* sd->msg */);
213          ok = false;
214       }
215    }
216
217    /* Do write side of storage daemon */
218    if (ok && wstore) {
219       foreach_alist(storage, wstore) {
220          pm_strcpy(store_name, storage->hdr.name);
221          bash_spaces(store_name);
222          pm_strcpy(media_type, storage->media_type);
223          bash_spaces(media_type);
224          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
225                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
226
227          DEVICE *dev;
228          /* Loop over alternative storage Devices until one is OK */
229          foreach_alist(dev, storage->device) {
230             pm_strcpy(device_name, dev->hdr.name);
231             bash_spaces(device_name);
232             bnet_fsend(sd, use_device, device_name.c_str());
233             Dmsg1(100, ">stored: %s", sd->msg);
234          }
235          bnet_sig(sd, BNET_EOD);            /* end of Devices */
236       }
237       bnet_sig(sd, BNET_EOD);            /* end of Storages */
238       if (bget_dirmsg(sd) > 0) {
239          Dmsg1(100, "<stored: %s", sd->msg);
240          /* ****FIXME**** save actual device name */
241          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
242       } else {
243          POOL_MEM err_msg;
244          pm_strcpy(err_msg, sd->msg); /* save message */
245          Jmsg(jcr, M_FATAL, 0, _("\n"
246             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
247             device_name.c_str(), err_msg.c_str()/* sd->msg */);
248          ok = false;
249       }
250    }
251    return ok;
252 }
253
254 /*
255  * Start a thread to handle Storage daemon messages and
256  *  Catalog requests.
257  */
258 int start_storage_daemon_message_thread(JCR *jcr)
259 {
260    int status;
261    pthread_t thid;
262
263    P(jcr->mutex);
264    jcr->use_count++;                  /* mark in use by msg thread */
265    jcr->sd_msg_thread_done = false;
266    jcr->SD_msg_chan = 0;
267    V(jcr->mutex);
268    Dmsg0(100, "Start SD msg_thread.\n");
269    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
270       berrno be;
271       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
272    }
273    Dmsg0(100, "SD msg_thread started.\n");
274    /* Wait for thread to start */
275    while (jcr->SD_msg_chan == 0) {
276       bmicrosleep(0, 50);
277    }
278    return 1;
279 }
280
281 extern "C" void msg_thread_cleanup(void *arg)
282 {
283    JCR *jcr = (JCR *)arg;
284    Dmsg0(200, "End msg_thread\n");
285    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
286    P(jcr->mutex);
287    jcr->sd_msg_thread_done = true;
288    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
289    jcr->SD_msg_chan = 0;
290    V(jcr->mutex);
291    free_jcr(jcr);                     /* release jcr */
292 }
293
294 /*
295  * Handle the message channel (i.e. requests from the
296  *  Storage daemon).
297  * Note, we are running in a separate thread.
298  */
299 extern "C" void *msg_thread(void *arg)
300 {
301    JCR *jcr = (JCR *)arg;
302    BSOCK *sd;
303    int JobStatus;
304    char Job[MAX_NAME_LENGTH];
305    uint32_t JobFiles;
306    uint64_t JobBytes;
307    int stat;
308
309    pthread_detach(pthread_self());
310    jcr->SD_msg_chan = pthread_self();
311    pthread_cleanup_push(msg_thread_cleanup, arg);
312    sd = jcr->store_bsock;
313
314    /* Read the Storage daemon's output.
315     */
316    Dmsg0(100, "Start msg_thread loop\n");
317    while ((stat=bget_dirmsg(sd)) >= 0) {
318       int stat;
319       Dmsg1(3400, "<stored: %s", sd->msg);
320       if (sscanf(sd->msg, Job_start, Job) == 1) {
321          continue;
322       }
323       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
324                  &JobBytes)) == 4) {
325          jcr->SDJobStatus = JobStatus; /* termination status */
326          jcr->SDJobFiles = JobFiles;
327          jcr->SDJobBytes = JobBytes;
328          break;
329       }
330    }
331    if (is_bnet_error(sd)) {
332       jcr->SDJobStatus = JS_ErrorTerminated;
333    }
334    pthread_cleanup_pop(1);            /* remove and execute the handler */
335    return NULL;
336 }
337
338 void wait_for_storage_daemon_termination(JCR *jcr)
339 {
340    int cancel_count = 0;
341    /* Now wait for Storage daemon to terminate our message thread */
342    set_jcr_job_status(jcr, JS_WaitSD);
343    P(jcr->mutex);
344    while (!jcr->sd_msg_thread_done) {
345       struct timeval tv;
346       struct timezone tz;
347       struct timespec timeout;
348
349       gettimeofday(&tv, &tz);
350       timeout.tv_nsec = 0;
351       timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
352       Dmsg0(300, "I'm waiting for message thread termination.\n");
353       pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
354       if (job_canceled(jcr)) {
355          cancel_count++;
356       }
357       /* Give SD 30 seconds to clean up after cancel */
358       if (cancel_count == 3) {
359          break;
360       }
361    }
362    V(jcr->mutex);
363    set_jcr_job_status(jcr, JS_Terminated);
364 }
365
366 #ifdef needed
367 #define MAX_TRIES 30
368 #define WAIT_TIME 2
369 extern "C" void *device_thread(void *arg)
370 {
371    int i;
372    JCR *jcr;
373    DEVICE *dev;
374
375
376    pthread_detach(pthread_self());
377    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
378    for (i=0; i < MAX_TRIES; i++) {
379       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
380          Dmsg0(000, "Failed connecting to SD.\n");
381          continue;
382       }
383       LockRes();
384       foreach_res(dev, R_DEVICE) {
385          if (!update_device_res(jcr, dev)) {
386             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
387          } else {
388             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
389          }
390       }
391       UnlockRes();
392       bnet_close(jcr->store_bsock);
393       jcr->store_bsock = NULL;
394       break;
395
396    }
397    free_jcr(jcr);
398    return NULL;
399 }
400
401 /*
402  * Start a thread to handle getting Device resource information
403  *  from SD. This is called once at startup of the Director.
404  */
405 void init_device_resources()
406 {
407    int status;
408    pthread_t thid;
409
410    Dmsg0(100, "Start Device thread.\n");
411    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
412       berrno be;
413       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
414    }
415 }
416 #endif