]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
kes Reduce bconsole help to fit in 80 columns
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106           _("Storage daemon"), store->address,
107           NULL, store->SDport, verbose);
108    if (sd == NULL) {
109       return false;
110    }
111    sd->res = (RES *)store;        /* save pointer to other end */
112    jcr->store_bsock = sd;
113
114    if (!authenticate_storage_daemon(jcr, store)) {
115       sd->close();
116       jcr->store_bsock = NULL;
117       return false;
118    }
119    return true;
120 }
121
122 /*
123  * Here we ask the SD to send us the info for a 
124  *  particular device resource.
125  */
126 #ifdef xxx
127 bool update_device_res(JCR *jcr, DEVICE *dev)
128 {
129    POOL_MEM device_name; 
130    BSOCK *sd;
131    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
132       return false;
133    }
134    sd = jcr->store_bsock;
135    pm_strcpy(device_name, dev->name());
136    bash_spaces(device_name);
137    sd->fsend(query_device, device_name.c_str());
138    Dmsg1(100, ">stored: %s\n", sd->msg);
139    /* The data is returned through Device_update */
140    if (bget_dirmsg(sd) <= 0) {
141       return false;
142    }
143    return true;
144 }
145 #endif
146
147 static char OKbootstrap[] = "3000 OK bootstrap\n";
148
149 /*
150  * Start a job with the Storage daemon
151  */
152 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
153 {
154    bool ok = true;
155    STORE *storage;
156    BSOCK *sd;
157    char auth_key[100];
158    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
159    POOL_MEM job_name, client_name, fileset_name;
160    int copy = 0;
161    int stripe = 0;
162    char ed1[30], ed2[30];
163
164    sd = jcr->store_bsock;
165    /*
166     * Now send JobId and permissions, and get back the authorization key.
167     */
168    pm_strcpy(job_name, jcr->job->name());
169    bash_spaces(job_name);
170    pm_strcpy(client_name, jcr->client->name());
171    bash_spaces(client_name);
172    pm_strcpy(fileset_name, jcr->fileset->name());
173    bash_spaces(fileset_name);
174    if (jcr->fileset->MD5[0] == 0) {
175       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
176    }
177    /* If rescheduling, cancel the previous incarnation of this job
178     *  with the SD, which might be waiting on the FD connection.
179     *  If we do not cancel it the SD will not accept a new connection
180     *  for the same jobid.
181     */
182    if (jcr->reschedule_count) {
183       sd->fsend("cancel Job=%s\n", jcr->Job);
184       while (sd->recv() >= 0)
185          { }
186    } 
187    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
188              job_name.c_str(), client_name.c_str(), 
189              jcr->get_JobType(), jcr->get_JobLevel(),
190              fileset_name.c_str(), !jcr->pool->catalog_files,
191              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
192              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
193              edit_int64(jcr->spool_size, ed2));
194    Dmsg1(100, ">stored: %s", sd->msg);
195    if (bget_dirmsg(sd) > 0) {
196        Dmsg1(100, "<stored: %s", sd->msg);
197        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
198                   &jcr->VolSessionTime, &auth_key) != 3) {
199           Dmsg1(100, "BadJob=%s\n", sd->msg);
200           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
201           return false;
202        } else {
203           jcr->sd_auth_key = bstrdup(auth_key);
204           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
205        }
206    } else {
207       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
208          sd->bstrerror());
209       return false;
210    }
211
212    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
213        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
214       return false;
215    }
216
217    /*
218     * We have two loops here. The first comes from the 
219     *  Storage = associated with the Job, and we need 
220     *  to attach to each one.
221     * The inner loop loops over all the alternative devices
222     *  associated with each Storage. It selects the first
223     *  available one.
224     *
225     */
226    /* Do read side of storage daemon */
227    if (ok && rstore) {
228       /* For the moment, only migrate, copy and vbackup have rpool */
229       if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
230            (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
231          pm_strcpy(pool_type, jcr->rpool->pool_type);
232          pm_strcpy(pool_name, jcr->rpool->name());
233       } else {
234          pm_strcpy(pool_type, jcr->pool->pool_type);
235          pm_strcpy(pool_name, jcr->pool->name());
236       }
237       bash_spaces(pool_type);
238       bash_spaces(pool_name);
239       foreach_alist(storage, rstore) {
240          Dmsg1(100, "Rstore=%s\n", storage->name());
241          pm_strcpy(store_name, storage->name());
242          bash_spaces(store_name);
243          pm_strcpy(media_type, storage->media_type);
244          bash_spaces(media_type);
245          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
246                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
247          Dmsg1(100, "rstore >stored: %s", sd->msg);
248          DEVICE *dev;
249          /* Loop over alternative storage Devices until one is OK */
250          foreach_alist(dev, storage->device) {
251             pm_strcpy(device_name, dev->name());
252             bash_spaces(device_name);
253             sd->fsend(use_device, device_name.c_str());
254             Dmsg1(100, ">stored: %s", sd->msg);
255          }
256          sd->signal(BNET_EOD);           /* end of Devices */
257       }
258       sd->signal(BNET_EOD);              /* end of Storages */
259       if (bget_dirmsg(sd) > 0) {
260          Dmsg1(100, "<stored: %s", sd->msg);
261          /* ****FIXME**** save actual device name */
262          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
263       } else {
264          ok = false;
265       }
266    }
267
268    /* Do write side of storage daemon */
269    if (ok && wstore) {
270       pm_strcpy(pool_type, jcr->pool->pool_type);
271       pm_strcpy(pool_name, jcr->pool->name());
272       bash_spaces(pool_type);
273       bash_spaces(pool_name);
274       foreach_alist(storage, wstore) {
275          pm_strcpy(store_name, storage->name());
276          bash_spaces(store_name);
277          pm_strcpy(media_type, storage->media_type);
278          bash_spaces(media_type);
279          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
280                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
281
282          Dmsg1(100, "wstore >stored: %s", sd->msg);
283          DEVICE *dev;
284          /* Loop over alternative storage Devices until one is OK */
285          foreach_alist(dev, storage->device) {
286             pm_strcpy(device_name, dev->name());
287             bash_spaces(device_name);
288             sd->fsend(use_device, device_name.c_str());
289             Dmsg1(100, ">stored: %s", sd->msg);
290          }
291          sd->signal(BNET_EOD);           /* end of Devices */
292       }
293       sd->signal(BNET_EOD);              /* end of Storages */
294       if (bget_dirmsg(sd) > 0) {
295          Dmsg1(100, "<stored: %s", sd->msg);
296          /* ****FIXME**** save actual device name */
297          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
298       } else {
299          ok = false;
300       }
301    }
302    if (!ok) {
303       POOL_MEM err_msg;
304       if (sd->msg[0]) {
305          pm_strcpy(err_msg, sd->msg); /* save message */
306          Jmsg(jcr, M_FATAL, 0, _("\n"
307               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
308               device_name.c_str(), err_msg.c_str()/* sd->msg */);
309       } else { 
310          Jmsg(jcr, M_FATAL, 0, _("\n"
311               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
312               device_name.c_str());
313       }
314    } else {
315       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
316    }
317    return ok;
318 }
319
320 /*
321  * Start a thread to handle Storage daemon messages and
322  *  Catalog requests.
323  */
324 bool start_storage_daemon_message_thread(JCR *jcr)
325 {
326    int status;
327    pthread_t thid;
328
329    jcr->inc_use_count();              /* mark in use by msg thread */
330    jcr->sd_msg_thread_done = false;
331    jcr->SD_msg_chan = 0;
332    Dmsg0(100, "Start SD msg_thread.\n");
333    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
334       berrno be;
335       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
336    }
337    /* Wait for thread to start */
338    while (jcr->SD_msg_chan == 0) {
339       bmicrosleep(0, 50);
340       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
341          return false;
342       }
343    }
344    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
345    return true;
346 }
347
348 extern "C" void msg_thread_cleanup(void *arg)
349 {
350    JCR *jcr = (JCR *)arg;
351    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
352    jcr->sd_msg_thread_done = true;
353    jcr->SD_msg_chan = 0;
354    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
355    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
356    free_jcr(jcr);                     /* release jcr */
357    db_thread_cleanup();               /* remove thread specific data */
358 }
359
360 /*
361  * Handle the message channel (i.e. requests from the
362  *  Storage daemon).
363  * Note, we are running in a separate thread.
364  */
365 extern "C" void *msg_thread(void *arg)
366 {
367    JCR *jcr = (JCR *)arg;
368    BSOCK *sd;
369    int JobStatus;
370    char Job[MAX_NAME_LENGTH];
371    uint32_t JobFiles, JobErrors;
372    uint64_t JobBytes;
373
374    pthread_detach(pthread_self());
375    set_jcr_in_tsd(jcr);
376    jcr->SD_msg_chan = pthread_self();
377    pthread_cleanup_push(msg_thread_cleanup, arg);
378    sd = jcr->store_bsock;
379
380    /* Read the Storage daemon's output.
381     */
382    Dmsg0(100, "Start msg_thread loop\n");
383    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
384       Dmsg1(400, "<stored: %s", sd->msg);
385       if (sscanf(sd->msg, Job_start, Job) == 1) {
386          continue;
387       }
388       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
389                  &JobBytes, &JobErrors) == 5) {
390          jcr->SDJobStatus = JobStatus; /* termination status */
391          jcr->SDJobFiles = JobFiles;
392          jcr->SDJobBytes = JobBytes;
393          jcr->SDErrors = JobErrors;
394          break;
395       }
396       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
397    }
398    if (is_bnet_error(sd)) {
399       jcr->SDJobStatus = JS_ErrorTerminated;
400    }
401    pthread_cleanup_pop(1);            /* remove and execute the handler */
402    return NULL;
403 }
404
405 void wait_for_storage_daemon_termination(JCR *jcr)
406 {
407    int cancel_count = 0;
408    /* Now wait for Storage daemon to terminate our message thread */
409    while (!jcr->sd_msg_thread_done) {
410       struct timeval tv;
411       struct timezone tz;
412       struct timespec timeout;
413
414       gettimeofday(&tv, &tz);
415       timeout.tv_nsec = 0;
416       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
417       Dmsg0(400, "I'm waiting for message thread termination.\n");
418       P(mutex);
419       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
420       V(mutex);
421       if (job_canceled(jcr)) {
422          if (jcr->SD_msg_chan) {
423             jcr->store_bsock->set_timed_out();
424             jcr->store_bsock->set_terminated();
425             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
426             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
427          }
428          cancel_count++;
429       }
430       /* Give SD 30 seconds to clean up after cancel */
431       if (cancel_count == 6) {
432          break;
433       }
434    }
435    set_jcr_job_status(jcr, JS_Terminated);
436 }
437
438 #ifdef needed
439 #define MAX_TRIES 30
440 #define WAIT_TIME 2
441 extern "C" void *device_thread(void *arg)
442 {
443    int i;
444    JCR *jcr;
445    DEVICE *dev;
446
447
448    pthread_detach(pthread_self());
449    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
450    for (i=0; i < MAX_TRIES; i++) {
451       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
452          Dmsg0(900, "Failed connecting to SD.\n");
453          continue;
454       }
455       LockRes();
456       foreach_res(dev, R_DEVICE) {
457          if (!update_device_res(jcr, dev)) {
458             Dmsg1(900, "Error updating device=%s\n", dev->name());
459          } else {
460             Dmsg1(900, "Updated Device=%s\n", dev->name());
461          }
462       }
463       UnlockRes();
464       bnet_close(jcr->store_bsock);
465       jcr->store_bsock = NULL;
466       break;
467
468    }
469    free_jcr(jcr);
470    return NULL;
471 }
472
473 /*
474  * Start a thread to handle getting Device resource information
475  *  from SD. This is called once at startup of the Director.
476  */
477 void init_device_resources()
478 {
479    int status;
480    pthread_t thid;
481
482    Dmsg0(100, "Start Device thread.\n");
483    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
484       berrno be;
485       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
486    }
487 }
488 #endif