]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Backout vol size tests in previous attempt to fix bug #2349
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2017 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  *   Bacula Director -- msgchan.c -- handles the message channel
21  *    to the Storage daemon and the File daemon.
22  *
23  *     Written by Kern Sibbald, August MM
24  *
25  *    This routine runs as a thread and must be thread reentrant.
26  *
27  *  Basic tasks done here:
28  *    Open a message channel with the Storage daemon
29  *      to authenticate ourself and to pass the JobId.
30  *    Create a thread to interact with the Storage daemon
31  *      who returns a job status and requests Catalog services, etc.
32  */
33
34 #include "bacula.h"
35 #include "dird.h"
36
37 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
38
39 /* Commands sent to Storage daemon */
40 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
41    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
42    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
43    "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
44    "Authorization=%s\n";
45 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
46    "pool_type=%s append=%d copy=%d stripe=%d\n";
47 static char use_device[] = "use device=%s\n";
48 //static char query_device[] = _("query device=%s");
49
50 /* Response from Storage daemon */
51 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
52 static char OK_device[]  = "3000 OK use device device=%s\n";
53
54 /* Storage Daemon requests */
55 static char Job_start[]  = "3010 Job %127s start\n";
56 static char Job_end[]    =
57    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u ErrMsg=%256s\n";
58
59 /* Forward referenced functions */
60 extern "C" void *msg_thread(void *arg);
61
62 BSOCK *open_sd_bsock(UAContext *ua)
63 {
64    STORE *store = ua->jcr->wstore;
65
66    if (!is_bsock_open(ua->jcr->store_bsock)) {
67       ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
68          store->name(), store->address, store->SDport);
69       if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
70          ua->error_msg(_("Failed to connect to Storage daemon.\n"));
71          return NULL;
72       }
73    }
74    return ua->jcr->store_bsock;
75 }
76
77 void close_sd_bsock(UAContext *ua)
78 {
79    if (ua->jcr->store_bsock) {
80       ua->jcr->store_bsock->signal(BNET_TERMINATE);
81       free_bsock(ua->jcr->store_bsock);
82    }
83 }
84
85 /*
86  * Establish a message channel connection with the Storage daemon
87  * and perform authentication.
88  */
89 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
90                               int max_retry_time, int verbose)
91 {
92    BSOCK *sd = jcr->store_bsock;
93    STORE *store;
94    utime_t heart_beat;
95
96    if (is_bsock_open(sd)) {
97       return true;                    /* already connected */
98    }
99    if (!sd) {
100       sd = new_bsock();
101    }
102
103    /* If there is a write storage use it */
104    if (jcr->wstore) {
105       store = jcr->wstore;
106    } else {
107       store = jcr->rstore;
108    }
109
110    if (store->heartbeat_interval) {
111       heart_beat = store->heartbeat_interval;
112    } else {
113       heart_beat = director->heartbeat_interval;
114    }
115
116    /*
117     *  Open message channel with the Storage daemon
118     */
119    Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
120       store->SDport);
121    sd->set_source_address(director->DIRsrc_addr);
122    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
123          store->address, NULL, store->SDport, verbose)) {
124
125       if (!jcr->store_bsock) {  /* The bsock was locally created, so we free it here */
126          free_bsock(sd);
127       }
128       sd = NULL;
129    }
130
131    if (sd == NULL) {
132       return false;
133    }
134    sd->res = (RES *)store;        /* save pointer to other end */
135    jcr->store_bsock = sd;
136
137    if (!authenticate_storage_daemon(jcr, store)) {
138       sd->close();
139       return false;
140    }
141    return true;
142 }
143
144 /*
145  * Here we ask the SD to send us the info for a
146  *  particular device resource.
147  */
148 #ifdef xxx
149 bool update_device_res(JCR *jcr, DEVICE *dev)
150 {
151    POOL_MEM device_name;
152    BSOCK *sd;
153    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
154       return false;
155    }
156    sd = jcr->store_bsock;
157    pm_strcpy(device_name, dev->name());
158    bash_spaces(device_name);
159    sd->fsend(query_device, device_name.c_str());
160    Dmsg1(100, ">stored: %s\n", sd->msg);
161    /* The data is returned through Device_update */
162    if (bget_dirmsg(sd) <= 0) {
163       return false;
164    }
165    return true;
166 }
167 #endif
168
169 static char OKbootstrap[] = "3000 OK bootstrap\n";
170
171 /*
172  * Start a job with the Storage daemon
173  */
174 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
175 {
176    bool ok = true;
177    STORE *storage;
178    BSOCK *sd;
179    char sd_auth_key[100];
180    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
181    POOL_MEM job_name, client_name, fileset_name;
182    int copy = 0;
183    int stripe = 0;
184    char ed1[30], ed2[30];
185    int sd_client;
186
187    sd = jcr->store_bsock;
188    /*
189     * Now send JobId and permissions, and get back the authorization key.
190     */
191    pm_strcpy(job_name, jcr->job->name());
192    bash_spaces(job_name);
193    if (jcr->client) {
194       pm_strcpy(client_name, jcr->client->name());
195    } else {
196       pm_strcpy(client_name, "**Dummy**");
197    }
198    bash_spaces(client_name);
199    pm_strcpy(fileset_name, jcr->fileset->name());
200    bash_spaces(fileset_name);
201    if (jcr->fileset->MD5[0] == 0) {
202       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
203    }
204    /* If rescheduling, cancel the previous incarnation of this job
205     *  with the SD, which might be waiting on the FD connection.
206     *  If we do not cancel it the SD will not accept a new connection
207     *  for the same jobid.
208     */
209    if (jcr->reschedule_count) {
210       sd->fsend("cancel Job=%s\n", jcr->Job);
211       while (sd->recv() >= 0)
212          { }
213    }
214
215    sd_client = jcr->sd_client;
216    if (jcr->sd_auth_key) {
217       bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
218    } else {
219       bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
220    }
221
222    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
223              job_name.c_str(), client_name.c_str(),
224              jcr->getJobType(), jcr->getJobLevel(),
225              fileset_name.c_str(), !jcr->pool->catalog_files,
226              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
227              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
228              edit_int64(jcr->spool_size, ed2), jcr->rerunning,
229              jcr->VolSessionId, jcr->VolSessionTime, sd_client,
230              sd_auth_key);
231
232    Dmsg1(100, ">stored: %s", sd->msg);
233    Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
234    if (bget_dirmsg(sd) > 0) {
235        Dmsg1(100, "<stored: %s", sd->msg);
236        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
237                   &jcr->VolSessionTime, &sd_auth_key) != 3) {
238           Dmsg1(100, "BadJob=%s\n", sd->msg);
239           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
240           return false;
241        } else {
242           bfree_and_null(jcr->sd_auth_key);
243           jcr->sd_auth_key = bstrdup(sd_auth_key);
244           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
245        }
246    } else {
247       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
248          sd->bstrerror());
249       return false;
250    }
251
252    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
253        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
254       return false;
255    }
256
257    /*
258     * We have two loops here. The first comes from the
259     *  Storage = associated with the Job, and we need
260     *  to attach to each one.
261     * The inner loop loops over all the alternative devices
262     *  associated with each Storage. It selects the first
263     *  available one.
264     *
265     */
266    /* Do read side of storage daemon */
267    if (ok && rstore) {
268       /* For the moment, only migrate, copy and vbackup have rpool */
269       if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
270            (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
271          pm_strcpy(pool_type, jcr->rpool->pool_type);
272          pm_strcpy(pool_name, jcr->rpool->name());
273       } else {
274          pm_strcpy(pool_type, jcr->pool->pool_type);
275          pm_strcpy(pool_name, jcr->pool->name());
276       }
277       bash_spaces(pool_type);
278       bash_spaces(pool_name);
279       foreach_alist(storage, rstore) {
280          Dmsg1(100, "Rstore=%s\n", storage->name());
281          pm_strcpy(store_name, storage->name());
282          bash_spaces(store_name);
283          if (jcr->media_type) {
284             pm_strcpy(media_type, jcr->media_type);  /* user override */
285          } else {
286             pm_strcpy(media_type, storage->media_type);
287          }
288          bash_spaces(media_type);
289          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
290                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
291          Dmsg1(100, "rstore >stored: %s", sd->msg);
292          DEVICE *dev;
293          /* Loop over alternative storage Devices until one is OK */
294          foreach_alist(dev, storage->device) {
295             pm_strcpy(device_name, dev->name());
296             bash_spaces(device_name);
297             sd->fsend(use_device, device_name.c_str());
298             Dmsg1(100, ">stored: %s", sd->msg);
299          }
300          sd->signal(BNET_EOD);           /* end of Devices */
301       }
302       sd->signal(BNET_EOD);              /* end of Storages */
303       if (bget_dirmsg(sd) > 0) {
304          Dmsg1(100, "<stored: %s", sd->msg);
305          /* ****FIXME**** save actual device name */
306          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
307       } else {
308          ok = false;
309       }
310       if (ok) {
311          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
312       }
313    }
314
315    /* Do write side of storage daemon */
316    if (ok && wstore) {
317       pm_strcpy(pool_type, jcr->pool->pool_type);
318       pm_strcpy(pool_name, jcr->pool->name());
319       bash_spaces(pool_type);
320       bash_spaces(pool_name);
321       foreach_alist(storage, wstore) {
322          Dmsg1(100, "Wstore=%s\n", storage->name());
323          pm_strcpy(store_name, storage->name());
324          bash_spaces(store_name);
325          pm_strcpy(media_type, storage->media_type);
326          bash_spaces(media_type);
327          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
328                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
329
330          Dmsg1(100, "wstore >stored: %s", sd->msg);
331          DEVICE *dev;
332          /* Loop over alternative storage Devices until one is OK */
333          foreach_alist(dev, storage->device) {
334             pm_strcpy(device_name, dev->name());
335             bash_spaces(device_name);
336             sd->fsend(use_device, device_name.c_str());
337             Dmsg1(100, ">stored: %s", sd->msg);
338          }
339          sd->signal(BNET_EOD);           /* end of Devices */
340       }
341       sd->signal(BNET_EOD);              /* end of Storages */
342       if (bget_dirmsg(sd) > 0) {
343          Dmsg1(100, "<stored: %s", sd->msg);
344          /* ****FIXME**** save actual device name */
345          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
346       } else {
347          ok = false;
348       }
349       if (ok) {
350          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
351       }
352    }
353    if (!ok) {
354       POOL_MEM err_msg;
355       if (sd->msg[0]) {
356          pm_strcpy(err_msg, sd->msg); /* save message */
357          Jmsg(jcr, M_FATAL, 0, _("\n"
358               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
359               device_name.c_str(), err_msg.c_str()/* sd->msg */);
360       } else {
361          Jmsg(jcr, M_FATAL, 0, _("\n"
362               "     Storage daemon didn't accept Device \"%s\" command.\n"),
363               device_name.c_str());
364       }
365    }
366    return ok;
367 }
368
369 /*
370  * Start a thread to handle Storage daemon messages and
371  *  Catalog requests.
372  */
373 bool start_storage_daemon_message_thread(JCR *jcr)
374 {
375    int status;
376    pthread_t thid;
377
378    jcr->inc_use_count();              /* mark in use by msg thread */
379    jcr->sd_msg_thread_done = false;
380    jcr->SD_msg_chan_started = false;
381    Dmsg0(150, "Start SD msg_thread.\n");
382    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
383       berrno be;
384       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
385    }
386    /* Wait for thread to start */
387    while (jcr->SD_msg_chan_started == false) {
388       bmicrosleep(0, 50);
389       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
390          return false;
391       }
392    }
393    Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
394    return true;
395 }
396
397 extern "C" void msg_thread_cleanup(void *arg)
398 {
399    JCR *jcr = (JCR *)arg;
400    db_end_transaction(jcr, jcr->db);      /* terminate any open transaction */
401    jcr->lock();
402    jcr->sd_msg_thread_done = true;
403    jcr->SD_msg_chan_started = false;
404    jcr->unlock();
405    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
406    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
407    db_thread_cleanup(jcr->db);           /* remove thread specific data */
408    free_jcr(jcr);                        /* release jcr */
409 }
410
411 /*
412  * Handle the message channel (i.e. requests from the
413  *  Storage daemon).
414  * Note, we are running in a separate thread.
415  */
416 extern "C" void *msg_thread(void *arg)
417 {
418    JCR *jcr = (JCR *)arg;
419    BSOCK *sd;
420    int JobStatus;
421    int n;
422    char Job[MAX_NAME_LENGTH];
423    char ErrMsg[256];
424    uint32_t JobFiles, JobErrors;
425    uint64_t JobBytes;
426    ErrMsg[0] = 0;
427
428    pthread_detach(pthread_self());
429    set_jcr_in_tsd(jcr);
430    jcr->SD_msg_chan = pthread_self();
431    jcr->SD_msg_chan_started = true;
432    pthread_cleanup_push(msg_thread_cleanup, arg);
433    sd = jcr->store_bsock;
434
435    /* Read the Storage daemon's output.
436     */
437    Dmsg0(100, "Start msg_thread loop\n");
438    n = 0;
439    while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
440       Dmsg1(400, "<stored: %s", sd->msg);
441       if (sscanf(sd->msg, Job_start, Job) == 1) {
442          continue;
443       }
444       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
445                  &JobBytes, &JobErrors, ErrMsg) == 6) {
446          jcr->SDJobStatus = JobStatus; /* termination status */
447          jcr->SDJobFiles = JobFiles;
448          jcr->SDJobBytes = JobBytes;
449          jcr->SDErrors = JobErrors;
450          unbash_spaces(ErrMsg); /* Error message if any */
451          pm_strcpy(jcr->StatusErrMsg, ErrMsg);
452          break;
453       }
454       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
455    }
456    if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
457       /*
458        * This probably should be M_FATAL, but I am not 100% sure
459        *  that this return *always* corresponds to a dropped line.
460        */
461       Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
462    }
463    if (jcr->getJobStatus() == JS_Canceled) {
464       jcr->SDJobStatus = JS_Canceled;
465    } else if (sd->is_error()) {
466       jcr->SDJobStatus = JS_ErrorTerminated;
467    }
468    pthread_cleanup_pop(1);            /* remove and execute the handler */
469    return NULL;
470 }
471
472 void wait_for_storage_daemon_termination(JCR *jcr)
473 {
474    int cancel_count = 0;
475    /* Now wait for Storage daemon to terminate our message thread */
476    while (!jcr->sd_msg_thread_done) {
477       struct timeval tv;
478       struct timezone tz;
479       struct timespec timeout;
480
481       gettimeofday(&tv, &tz);
482       timeout.tv_nsec = 0;
483       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
484       Dmsg0(400, "I'm waiting for message thread termination.\n");
485       P(mutex);
486       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
487       V(mutex);
488       if (jcr->is_canceled()) {
489          if (jcr->SD_msg_chan_started) {
490             jcr->store_bsock->set_timed_out();
491             jcr->store_bsock->set_terminated();
492             sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
493          }
494          cancel_count++;
495       }
496       /* Give SD 30 seconds to clean up after cancel */
497       if (cancel_count == 6) {
498          break;
499       }
500    }
501    jcr->setJobStatus(JS_Terminated);
502 }
503
504 /*
505  * Send bootstrap file to Storage daemon.
506  *  This is used for restore, verify VolumeToCatalog, migration,
507  *    and copy Jobs.
508  */
509 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
510 {
511    FILE *bs;
512    char buf[1000];
513    const char *bootstrap = "bootstrap\n";
514
515    Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
516    if (!jcr->RestoreBootstrap) {
517       return true;
518    }
519    bs = bfopen(jcr->RestoreBootstrap, "rb");
520    if (!bs) {
521       berrno be;
522       Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
523          jcr->RestoreBootstrap, be.bstrerror());
524       jcr->setJobStatus(JS_ErrorTerminated);
525       return false;
526    }
527    sd->fsend(bootstrap);
528    while (fgets(buf, sizeof(buf), bs)) {
529       sd->fsend("%s", buf);
530    }
531    sd->signal(BNET_EOD);
532    fclose(bs);
533    if (jcr->unlink_bsr) {
534       unlink(jcr->RestoreBootstrap);
535       jcr->unlink_bsr = false;
536    }
537    return true;
538 }
539
540
541 #ifdef needed
542 #define MAX_TRIES 30
543 #define WAIT_TIME 2
544 extern "C" void *device_thread(void *arg)
545 {
546    int i;
547    JCR *jcr;
548    DEVICE *dev;
549
550
551    pthread_detach(pthread_self());
552    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
553    for (i=0; i < MAX_TRIES; i++) {
554       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
555          Dmsg0(900, "Failed connecting to SD.\n");
556          continue;
557       }
558       LockRes();
559       foreach_res(dev, R_DEVICE) {
560          if (!update_device_res(jcr, dev)) {
561             Dmsg1(900, "Error updating device=%s\n", dev->name());
562          } else {
563             Dmsg1(900, "Updated Device=%s\n", dev->name());
564          }
565       }
566       UnlockRes();
567       free_bsock(jcr->store_bsock);
568       break;
569
570    }
571    free_jcr(jcr);
572    return NULL;
573 }
574
575 /*
576  * Start a thread to handle getting Device resource information
577  *  from SD. This is called once at startup of the Director.
578  */
579 void init_device_resources()
580 {
581    int status;
582    pthread_t thid;
583
584    Dmsg0(100, "Start Device thread.\n");
585    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
586       berrno be;
587       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
588    }
589 }
590 #endif