]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Fix #2085 About director segfault in cram-md5 function
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula(R) - The Network Backup Solution
3
4    Copyright (C) 2000-2015 Kern Sibbald
5
6    The original author of Bacula is Kern Sibbald, with contributions
7    from many others, a complete list can be found in the file AUTHORS.
8
9    You may use this file and others of this release according to the
10    license defined in the LICENSE file, which includes the Affero General
11    Public License, v3.0 ("AGPLv3") and some additional permissions and
12    terms pursuant to its AGPLv3 Section 7.
13
14    This notice must be preserved when any source code is 
15    conveyed and/or propagated.
16
17    Bacula(R) is a registered trademark of Kern Sibbald.
18 */
19 /*
20  *
21  *   Bacula Director -- msgchan.c -- handles the message channel
22  *    to the Storage daemon and the File daemon.
23  *
24  *     Written by Kern Sibbald, August MM
25  *
26  *    This routine runs as a thread and must be thread reentrant.
27  *
28  *  Basic tasks done here:
29  *    Open a message channel with the Storage daemon
30  *      to authenticate ourself and to pass the JobId.
31  *    Create a thread to interact with the Storage daemon
32  *      who returns a job status and requests Catalog services, etc.
33  *
34  */
35
36 #include "bacula.h"
37 #include "dird.h"
38
39 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
40
41 /* Commands sent to Storage daemon */
42 static char jobcmd[] = "JobId=%s job=%s job_name=%s client_name=%s "
43    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
44    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
45    "rerunning=%d VolSessionId=%d VolSessionTime=%d sd_client=%d "
46    "Authorization=%s\n";
47 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
48    "pool_type=%s append=%d copy=%d stripe=%d\n";
49 static char use_device[] = "use device=%s\n";
50 //static char query_device[] = _("query device=%s");
51
52 /* Response from Storage daemon */
53 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
54 static char OK_device[]  = "3000 OK use device device=%s\n";
55
56 /* Storage Daemon requests */
57 static char Job_start[]  = "3010 Job %127s start\n";
58 static char Job_end[]    =
59    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
60
61 /* Forward referenced functions */
62 extern "C" void *msg_thread(void *arg);
63
64 BSOCK *open_sd_bsock(UAContext *ua)
65 {
66    STORE *store = ua->jcr->wstore;
67
68    if (!is_bsock_open(ua->jcr->store_bsock)) {
69       ua->send_msg(_("Connecting to Storage daemon %s at %s:%d ...\n"),
70          store->name(), store->address, store->SDport);
71       if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) {
72          ua->error_msg(_("Failed to connect to Storage daemon.\n"));
73          return NULL;
74       }
75    }
76    return ua->jcr->store_bsock;
77 }
78
79 void close_sd_bsock(UAContext *ua)
80 {
81    if (ua->jcr->store_bsock) {
82       ua->jcr->store_bsock->signal(BNET_TERMINATE);
83       free_bsock(ua->jcr->store_bsock);
84    }
85 }
86
87 /*
88  * Establish a message channel connection with the Storage daemon
89  * and perform authentication.
90  */
91 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
92                               int max_retry_time, int verbose)
93 {
94    BSOCK *sd = jcr->store_bsock;
95    STORE *store;
96    utime_t heart_beat;
97
98    if (is_bsock_open(sd)) {
99       return true;                    /* already connected */
100    }
101    if (!sd) {
102       sd = new_bsock();
103    }
104
105    /* If there is a write storage use it */
106    if (jcr->wstore) {
107       store = jcr->wstore;
108    } else {
109       store = jcr->rstore;
110    }
111
112    if (store->heartbeat_interval) {
113       heart_beat = store->heartbeat_interval;
114    } else {
115       heart_beat = director->heartbeat_interval;
116    }
117
118    /*
119     *  Open message channel with the Storage daemon
120     */
121    Dmsg2(100, "Connect to Storage daemon %s:%d\n", store->address,
122       store->SDport);
123    sd->set_source_address(director->DIRsrc_addr);
124    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
125          store->address, NULL, store->SDport, verbose)) {
126
127       if (!jcr->store_bsock) {  /* The bsock was locally created, so we free it here */
128          free_bsock(sd);
129       }
130       sd = NULL;
131    }
132
133    if (sd == NULL) {
134       return false;
135    }
136    sd->res = (RES *)store;        /* save pointer to other end */
137    jcr->store_bsock = sd;
138
139    if (!authenticate_storage_daemon(jcr, store)) {
140       sd->close();
141       return false;
142    }
143    return true;
144 }
145
146 /*
147  * Here we ask the SD to send us the info for a
148  *  particular device resource.
149  */
150 #ifdef xxx
151 bool update_device_res(JCR *jcr, DEVICE *dev)
152 {
153    POOL_MEM device_name;
154    BSOCK *sd;
155    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
156       return false;
157    }
158    sd = jcr->store_bsock;
159    pm_strcpy(device_name, dev->name());
160    bash_spaces(device_name);
161    sd->fsend(query_device, device_name.c_str());
162    Dmsg1(100, ">stored: %s\n", sd->msg);
163    /* The data is returned through Device_update */
164    if (bget_dirmsg(sd) <= 0) {
165       return false;
166    }
167    return true;
168 }
169 #endif
170
171 static char OKbootstrap[] = "3000 OK bootstrap\n";
172
173 /*
174  * Start a job with the Storage daemon
175  */
176 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
177 {
178    bool ok = true;
179    STORE *storage;
180    BSOCK *sd;
181    char sd_auth_key[100];
182    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
183    POOL_MEM job_name, client_name, fileset_name;
184    int copy = 0;
185    int stripe = 0;
186    char ed1[30], ed2[30];
187    int sd_client;
188
189    sd = jcr->store_bsock;
190    /*
191     * Now send JobId and permissions, and get back the authorization key.
192     */
193    pm_strcpy(job_name, jcr->job->name());
194    bash_spaces(job_name);
195    if (jcr->client) {
196       pm_strcpy(client_name, jcr->client->name());
197    } else {
198       pm_strcpy(client_name, "**Dummy**");
199    }
200    bash_spaces(client_name);
201    pm_strcpy(fileset_name, jcr->fileset->name());
202    bash_spaces(fileset_name);
203    if (jcr->fileset->MD5[0] == 0) {
204       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
205    }
206    /* If rescheduling, cancel the previous incarnation of this job
207     *  with the SD, which might be waiting on the FD connection.
208     *  If we do not cancel it the SD will not accept a new connection
209     *  for the same jobid.
210     */
211    if (jcr->reschedule_count) {
212       sd->fsend("cancel Job=%s\n", jcr->Job);
213       while (sd->recv() >= 0)
214          { }
215    }
216
217    sd_client = jcr->sd_client;
218    if (jcr->sd_auth_key) {
219       bstrncpy(sd_auth_key, jcr->sd_auth_key, sizeof(sd_auth_key));
220    } else {
221       bstrncpy(sd_auth_key, "dummy", sizeof(sd_auth_key));
222    }
223
224    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job,
225              job_name.c_str(), client_name.c_str(),
226              jcr->getJobType(), jcr->getJobLevel(),
227              fileset_name.c_str(), !jcr->pool->catalog_files,
228              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data,
229              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
230              edit_int64(jcr->spool_size, ed2), jcr->rerunning,
231              jcr->VolSessionId, jcr->VolSessionTime, sd_client,
232              sd_auth_key);
233
234    Dmsg1(100, ">stored: %s", sd->msg);
235    Dmsg2(100, "=== rstore=%p wstore=%p\n", rstore, wstore);
236    if (bget_dirmsg(sd) > 0) {
237        Dmsg1(100, "<stored: %s", sd->msg);
238        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
239                   &jcr->VolSessionTime, &sd_auth_key) != 3) {
240           Dmsg1(100, "BadJob=%s\n", sd->msg);
241           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
242           return false;
243        } else {
244           bfree_and_null(jcr->sd_auth_key);
245           jcr->sd_auth_key = bstrdup(sd_auth_key);
246           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
247        }
248    } else {
249       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
250          sd->bstrerror());
251       return false;
252    }
253
254    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
255        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
256       return false;
257    }
258
259    /*
260     * We have two loops here. The first comes from the
261     *  Storage = associated with the Job, and we need
262     *  to attach to each one.
263     * The inner loop loops over all the alternative devices
264     *  associated with each Storage. It selects the first
265     *  available one.
266     *
267     */
268    /* Do read side of storage daemon */
269    if (ok && rstore) {
270       /* For the moment, only migrate, copy and vbackup have rpool */
271       if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
272            (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
273          pm_strcpy(pool_type, jcr->rpool->pool_type);
274          pm_strcpy(pool_name, jcr->rpool->name());
275       } else {
276          pm_strcpy(pool_type, jcr->pool->pool_type);
277          pm_strcpy(pool_name, jcr->pool->name());
278       }
279       bash_spaces(pool_type);
280       bash_spaces(pool_name);
281       foreach_alist(storage, rstore) {
282          Dmsg1(100, "Rstore=%s\n", storage->name());
283          pm_strcpy(store_name, storage->name());
284          bash_spaces(store_name);
285          if (jcr->media_type) {
286             pm_strcpy(media_type, jcr->media_type);  /* user override */
287          } else {
288             pm_strcpy(media_type, storage->media_type);
289          }
290          bash_spaces(media_type);
291          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
292                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
293          Dmsg1(100, "rstore >stored: %s", sd->msg);
294          DEVICE *dev;
295          /* Loop over alternative storage Devices until one is OK */
296          foreach_alist(dev, storage->device) {
297             pm_strcpy(device_name, dev->name());
298             bash_spaces(device_name);
299             sd->fsend(use_device, device_name.c_str());
300             Dmsg1(100, ">stored: %s", sd->msg);
301          }
302          sd->signal(BNET_EOD);           /* end of Devices */
303       }
304       sd->signal(BNET_EOD);              /* end of Storages */
305       if (bget_dirmsg(sd) > 0) {
306          Dmsg1(100, "<stored: %s", sd->msg);
307          /* ****FIXME**** save actual device name */
308          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
309       } else {
310          ok = false;
311       }
312       if (ok) {
313          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to read.\n"), device_name.c_str());
314       }
315    }
316
317    /* Do write side of storage daemon */
318    if (ok && wstore) {
319       pm_strcpy(pool_type, jcr->pool->pool_type);
320       pm_strcpy(pool_name, jcr->pool->name());
321       bash_spaces(pool_type);
322       bash_spaces(pool_name);
323       foreach_alist(storage, wstore) {
324          Dmsg1(100, "Wstore=%s\n", storage->name());
325          pm_strcpy(store_name, storage->name());
326          bash_spaces(store_name);
327          pm_strcpy(media_type, storage->media_type);
328          bash_spaces(media_type);
329          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(),
330                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
331
332          Dmsg1(100, "wstore >stored: %s", sd->msg);
333          DEVICE *dev;
334          /* Loop over alternative storage Devices until one is OK */
335          foreach_alist(dev, storage->device) {
336             pm_strcpy(device_name, dev->name());
337             bash_spaces(device_name);
338             sd->fsend(use_device, device_name.c_str());
339             Dmsg1(100, ">stored: %s", sd->msg);
340          }
341          sd->signal(BNET_EOD);           /* end of Devices */
342       }
343       sd->signal(BNET_EOD);              /* end of Storages */
344       if (bget_dirmsg(sd) > 0) {
345          Dmsg1(100, "<stored: %s", sd->msg);
346          /* ****FIXME**** save actual device name */
347          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
348       } else {
349          ok = false;
350       }
351       if (ok) {
352          Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\" to write.\n"), device_name.c_str());
353       }
354    }
355    if (!ok) {
356       POOL_MEM err_msg;
357       if (sd->msg[0]) {
358          pm_strcpy(err_msg, sd->msg); /* save message */
359          Jmsg(jcr, M_FATAL, 0, _("\n"
360               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
361               device_name.c_str(), err_msg.c_str()/* sd->msg */);
362       } else {
363          Jmsg(jcr, M_FATAL, 0, _("\n"
364               "     Storage daemon didn't accept Device \"%s\" command.\n"),
365               device_name.c_str());
366       }
367    }
368    return ok;
369 }
370
371 /*
372  * Start a thread to handle Storage daemon messages and
373  *  Catalog requests.
374  */
375 bool start_storage_daemon_message_thread(JCR *jcr)
376 {
377    int status;
378    pthread_t thid;
379
380    jcr->inc_use_count();              /* mark in use by msg thread */
381    jcr->sd_msg_thread_done = false;
382    jcr->SD_msg_chan_started = false;
383    Dmsg0(150, "Start SD msg_thread.\n");
384    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
385       berrno be;
386       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
387    }
388    /* Wait for thread to start */
389    while (jcr->SD_msg_chan_started == false) {
390       bmicrosleep(0, 50);
391       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
392          return false;
393       }
394    }
395    Dmsg1(150, "SD msg_thread started. use=%d\n", jcr->use_count());
396    return true;
397 }
398
399 extern "C" void msg_thread_cleanup(void *arg)
400 {
401    JCR *jcr = (JCR *)arg;
402    db_end_transaction(jcr, jcr->db);      /* terminate any open transaction */
403    jcr->lock();
404    jcr->sd_msg_thread_done = true;
405    jcr->SD_msg_chan_started = false;
406    jcr->unlock();
407    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
408    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
409    db_thread_cleanup(jcr->db);           /* remove thread specific data */
410    free_jcr(jcr);                        /* release jcr */
411 }
412
413 /*
414  * Handle the message channel (i.e. requests from the
415  *  Storage daemon).
416  * Note, we are running in a separate thread.
417  */
418 extern "C" void *msg_thread(void *arg)
419 {
420    JCR *jcr = (JCR *)arg;
421    BSOCK *sd;
422    int JobStatus;
423    int n;
424    char Job[MAX_NAME_LENGTH];
425    uint32_t JobFiles, JobErrors;
426    uint64_t JobBytes;
427
428    pthread_detach(pthread_self());
429    set_jcr_in_tsd(jcr);
430    jcr->SD_msg_chan = pthread_self();
431    jcr->SD_msg_chan_started = true;
432    pthread_cleanup_push(msg_thread_cleanup, arg);
433    sd = jcr->store_bsock;
434
435    /* Read the Storage daemon's output.
436     */
437    Dmsg0(100, "Start msg_thread loop\n");
438    n = 0;
439    while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
440       Dmsg1(400, "<stored: %s", sd->msg);
441       if (sscanf(sd->msg, Job_start, Job) == 1) {
442          continue;
443       }
444       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
445                  &JobBytes, &JobErrors) == 5) {
446          jcr->SDJobStatus = JobStatus; /* termination status */
447          jcr->SDJobFiles = JobFiles;
448          jcr->SDJobBytes = JobBytes;
449          jcr->SDErrors = JobErrors;
450          break;
451       }
452       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
453    }
454    if (n == BNET_HARDEOF && jcr->getJobStatus() != JS_Canceled) {
455       /*
456        * This probably should be M_FATAL, but I am not 100% sure
457        *  that this return *always* corresponds to a dropped line.
458        */
459       Qmsg(jcr, M_ERROR, 0, _("Director's connection to SD for this Job was lost.\n"));
460    }
461    if (jcr->getJobStatus() == JS_Canceled) {
462       jcr->SDJobStatus = JS_Canceled;
463    } else if (sd->is_error()) {
464       jcr->SDJobStatus = JS_ErrorTerminated;
465    }
466    pthread_cleanup_pop(1);            /* remove and execute the handler */
467    return NULL;
468 }
469
470 void wait_for_storage_daemon_termination(JCR *jcr)
471 {
472    int cancel_count = 0;
473    /* Now wait for Storage daemon to terminate our message thread */
474    while (!jcr->sd_msg_thread_done) {
475       struct timeval tv;
476       struct timezone tz;
477       struct timespec timeout;
478
479       gettimeofday(&tv, &tz);
480       timeout.tv_nsec = 0;
481       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
482       Dmsg0(400, "I'm waiting for message thread termination.\n");
483       P(mutex);
484       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
485       V(mutex);
486       if (jcr->is_canceled()) {
487          if (jcr->SD_msg_chan_started) {
488             jcr->store_bsock->set_timed_out();
489             jcr->store_bsock->set_terminated();
490             sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
491          }
492          cancel_count++;
493       }
494       /* Give SD 30 seconds to clean up after cancel */
495       if (cancel_count == 6) {
496          break;
497       }
498    }
499    jcr->setJobStatus(JS_Terminated);
500 }
501
502 /*
503  * Send bootstrap file to Storage daemon.
504  *  This is used for restore, verify VolumeToCatalog, migration,
505  *    and copy Jobs.
506  */
507 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
508 {
509    FILE *bs;
510    char buf[1000];
511    const char *bootstrap = "bootstrap\n";
512
513    Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
514    if (!jcr->RestoreBootstrap) {
515       return true;
516    }
517    bs = fopen(jcr->RestoreBootstrap, "rb");
518    if (!bs) {
519       berrno be;
520       Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
521          jcr->RestoreBootstrap, be.bstrerror());
522       jcr->setJobStatus(JS_ErrorTerminated);
523       return false;
524    }
525    sd->fsend(bootstrap);
526    while (fgets(buf, sizeof(buf), bs)) {
527       sd->fsend("%s", buf);
528    }
529    sd->signal(BNET_EOD);
530    fclose(bs);
531    if (jcr->unlink_bsr) {
532       unlink(jcr->RestoreBootstrap);
533       jcr->unlink_bsr = false;
534    }
535    return true;
536 }
537
538
539 #ifdef needed
540 #define MAX_TRIES 30
541 #define WAIT_TIME 2
542 extern "C" void *device_thread(void *arg)
543 {
544    int i;
545    JCR *jcr;
546    DEVICE *dev;
547
548
549    pthread_detach(pthread_self());
550    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
551    for (i=0; i < MAX_TRIES; i++) {
552       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
553          Dmsg0(900, "Failed connecting to SD.\n");
554          continue;
555       }
556       LockRes();
557       foreach_res(dev, R_DEVICE) {
558          if (!update_device_res(jcr, dev)) {
559             Dmsg1(900, "Error updating device=%s\n", dev->name());
560          } else {
561             Dmsg1(900, "Updated Device=%s\n", dev->name());
562          }
563       }
564       UnlockRes();
565       free_bsock(jcr->store_bsock);
566       break;
567
568    }
569    free_jcr(jcr);
570    return NULL;
571 }
572
573 /*
574  * Start a thread to handle getting Device resource information
575  *  from SD. This is called once at startup of the Director.
576  */
577 void init_device_resources()
578 {
579    int status;
580    pthread_t thid;
581
582    Dmsg0(100, "Start Device thread.\n");
583    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
584       berrno be;
585       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
586    }
587 }
588 #endif