]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
backport code from master
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2009 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version three of the GNU Affero General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU Affero General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  */
44
45 #include "bacula.h"
46 #include "dird.h"
47
48 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
49
50 /* Commands sent to Storage daemon */
51 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
52    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
53    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s "
54    "rerunning=%d VolSessionId=%d VolSessionTime=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd = new_bsock();
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd->set_source_address(director->DIRsrc_addr);
106    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
107          store->address, NULL, store->SDport, verbose)) {
108       sd->destroy();
109       sd = NULL;
110    }
111
112    if (sd == NULL) {
113       return false;
114    }
115    sd->res = (RES *)store;        /* save pointer to other end */
116    jcr->store_bsock = sd;
117
118    if (!authenticate_storage_daemon(jcr, store)) {
119       sd->close();
120       jcr->store_bsock = NULL;
121       return false;
122    }
123    return true;
124 }
125
126 /*
127  * Here we ask the SD to send us the info for a 
128  *  particular device resource.
129  */
130 #ifdef xxx
131 bool update_device_res(JCR *jcr, DEVICE *dev)
132 {
133    POOL_MEM device_name; 
134    BSOCK *sd;
135    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
136       return false;
137    }
138    sd = jcr->store_bsock;
139    pm_strcpy(device_name, dev->name());
140    bash_spaces(device_name);
141    sd->fsend(query_device, device_name.c_str());
142    Dmsg1(100, ">stored: %s\n", sd->msg);
143    /* The data is returned through Device_update */
144    if (bget_dirmsg(sd) <= 0) {
145       return false;
146    }
147    return true;
148 }
149 #endif
150
151 static char OKbootstrap[] = "3000 OK bootstrap\n";
152
153 /*
154  * Start a job with the Storage daemon
155  */
156 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
157 {
158    bool ok = true;
159    STORE *storage;
160    BSOCK *sd;
161    char auth_key[100];
162    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
163    POOL_MEM job_name, client_name, fileset_name;
164    int copy = 0;
165    int stripe = 0;
166    char ed1[30], ed2[30];
167
168    sd = jcr->store_bsock;
169    /*
170     * Now send JobId and permissions, and get back the authorization key.
171     */
172    pm_strcpy(job_name, jcr->job->name());
173    bash_spaces(job_name);
174    pm_strcpy(client_name, jcr->client->name());
175    bash_spaces(client_name);
176    pm_strcpy(fileset_name, jcr->fileset->name());
177    bash_spaces(fileset_name);
178    if (jcr->fileset->MD5[0] == 0) {
179       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
180    }
181    /* If rescheduling, cancel the previous incarnation of this job
182     *  with the SD, which might be waiting on the FD connection.
183     *  If we do not cancel it the SD will not accept a new connection
184     *  for the same jobid.
185     */
186    if (jcr->reschedule_count) {
187       sd->fsend("cancel Job=%s\n", jcr->Job);
188       while (sd->recv() >= 0)
189          { }
190    } 
191    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
192              job_name.c_str(), client_name.c_str(), 
193              jcr->getJobType(), jcr->getJobLevel(),
194              fileset_name.c_str(), !jcr->pool->catalog_files,
195              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
196              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
197              edit_int64(jcr->spool_size, ed2), jcr->rerunning,
198              jcr->VolSessionId, jcr->VolSessionTime);
199    Dmsg1(100, ">stored: %s", sd->msg);
200    if (bget_dirmsg(sd) > 0) {
201        Dmsg1(100, "<stored: %s", sd->msg);
202        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
203                   &jcr->VolSessionTime, &auth_key) != 3) {
204           Dmsg1(100, "BadJob=%s\n", sd->msg);
205           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
206           return false;
207        } else {
208           bfree_and_null(jcr->sd_auth_key);
209           jcr->sd_auth_key = bstrdup(auth_key);
210           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
211        }
212    } else {
213       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
214          sd->bstrerror());
215       return false;
216    }
217
218    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
219        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
220       return false;
221    }
222
223    /*
224     * We have two loops here. The first comes from the 
225     *  Storage = associated with the Job, and we need 
226     *  to attach to each one.
227     * The inner loop loops over all the alternative devices
228     *  associated with each Storage. It selects the first
229     *  available one.
230     *
231     */
232    /* Do read side of storage daemon */
233    if (ok && rstore) {
234       /* For the moment, only migrate, copy and vbackup have rpool */
235       if (jcr->is_JobType(JT_MIGRATE) || jcr->is_JobType(JT_COPY) ||
236            (jcr->is_JobType(JT_BACKUP) && jcr->is_JobLevel(L_VIRTUAL_FULL))) {
237          pm_strcpy(pool_type, jcr->rpool->pool_type);
238          pm_strcpy(pool_name, jcr->rpool->name());
239       } else {
240          pm_strcpy(pool_type, jcr->pool->pool_type);
241          pm_strcpy(pool_name, jcr->pool->name());
242       }
243       bash_spaces(pool_type);
244       bash_spaces(pool_name);
245       foreach_alist(storage, rstore) {
246          Dmsg1(100, "Rstore=%s\n", storage->name());
247          pm_strcpy(store_name, storage->name());
248          bash_spaces(store_name);
249          pm_strcpy(media_type, storage->media_type);
250          bash_spaces(media_type);
251          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
252                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
253          Dmsg1(100, "rstore >stored: %s", sd->msg);
254          DEVICE *dev;
255          /* Loop over alternative storage Devices until one is OK */
256          foreach_alist(dev, storage->device) {
257             pm_strcpy(device_name, dev->name());
258             bash_spaces(device_name);
259             sd->fsend(use_device, device_name.c_str());
260             Dmsg1(100, ">stored: %s", sd->msg);
261          }
262          sd->signal(BNET_EOD);           /* end of Devices */
263       }
264       sd->signal(BNET_EOD);              /* end of Storages */
265       if (bget_dirmsg(sd) > 0) {
266          Dmsg1(100, "<stored: %s", sd->msg);
267          /* ****FIXME**** save actual device name */
268          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
269       } else {
270          ok = false;
271       }
272    }
273
274    /* Do write side of storage daemon */
275    if (ok && wstore) {
276       pm_strcpy(pool_type, jcr->pool->pool_type);
277       pm_strcpy(pool_name, jcr->pool->name());
278       bash_spaces(pool_type);
279       bash_spaces(pool_name);
280       foreach_alist(storage, wstore) {
281          pm_strcpy(store_name, storage->name());
282          bash_spaces(store_name);
283          pm_strcpy(media_type, storage->media_type);
284          bash_spaces(media_type);
285          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
286                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
287
288          Dmsg1(100, "wstore >stored: %s", sd->msg);
289          DEVICE *dev;
290          /* Loop over alternative storage Devices until one is OK */
291          foreach_alist(dev, storage->device) {
292             pm_strcpy(device_name, dev->name());
293             bash_spaces(device_name);
294             sd->fsend(use_device, device_name.c_str());
295             Dmsg1(100, ">stored: %s", sd->msg);
296          }
297          sd->signal(BNET_EOD);           /* end of Devices */
298       }
299       sd->signal(BNET_EOD);              /* end of Storages */
300       if (bget_dirmsg(sd) > 0) {
301          Dmsg1(100, "<stored: %s", sd->msg);
302          /* ****FIXME**** save actual device name */
303          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
304       } else {
305          ok = false;
306       }
307    }
308    if (!ok) {
309       POOL_MEM err_msg;
310       if (sd->msg[0]) {
311          pm_strcpy(err_msg, sd->msg); /* save message */
312          Jmsg(jcr, M_FATAL, 0, _("\n"
313               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
314               device_name.c_str(), err_msg.c_str()/* sd->msg */);
315       } else { 
316          Jmsg(jcr, M_FATAL, 0, _("\n"
317               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
318               device_name.c_str());
319       }
320    } else {
321       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
322    }
323    return ok;
324 }
325
326 /*
327  * Start a thread to handle Storage daemon messages and
328  *  Catalog requests.
329  */
330 bool start_storage_daemon_message_thread(JCR *jcr)
331 {
332    int status;
333    pthread_t thid;
334
335    jcr->inc_use_count();              /* mark in use by msg thread */
336    jcr->sd_msg_thread_done = false;
337    jcr->SD_msg_chan = 0;
338    Dmsg0(100, "Start SD msg_thread.\n");
339    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
340       berrno be;
341       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
342    }
343    /* Wait for thread to start */
344    while (jcr->SD_msg_chan == 0) {
345       bmicrosleep(0, 50);
346       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
347          return false;
348       }
349    }
350    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
351    return true;
352 }
353
354 extern "C" void msg_thread_cleanup(void *arg)
355 {
356    JCR *jcr = (JCR *)arg;
357    db_end_transaction(jcr, jcr->db);        /* terminate any open transaction */
358    jcr->lock();
359    jcr->sd_msg_thread_done = true;
360    jcr->SD_msg_chan = 0;
361    jcr->unlock();
362    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
363    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
364    db_thread_cleanup(jcr->db);              /* remove thread specific data */
365    free_jcr(jcr);                           /* release jcr */
366 }
367
368 /*
369  * Handle the message channel (i.e. requests from the
370  *  Storage daemon).
371  * Note, we are running in a separate thread.
372  */
373 extern "C" void *msg_thread(void *arg)
374 {
375    JCR *jcr = (JCR *)arg;
376    BSOCK *sd;
377    int JobStatus;
378    int n;
379    char Job[MAX_NAME_LENGTH];
380    uint32_t JobFiles, JobErrors;
381    uint64_t JobBytes;
382
383    pthread_detach(pthread_self());
384    set_jcr_in_tsd(jcr);
385    jcr->SD_msg_chan = pthread_self();
386    pthread_cleanup_push(msg_thread_cleanup, arg);
387    sd = jcr->store_bsock;
388
389    /* Read the Storage daemon's output.
390     */
391    Dmsg0(100, "Start msg_thread loop\n");
392    n = 0;
393    while (!job_canceled(jcr) && (n=bget_dirmsg(sd)) >= 0) {
394       Dmsg1(400, "<stored: %s", sd->msg);
395       if (sscanf(sd->msg, Job_start, Job) == 1) {
396          continue;
397       }
398       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
399                  &JobBytes, &JobErrors) == 5) {
400          jcr->SDJobStatus = JobStatus; /* termination status */
401          jcr->SDJobFiles = JobFiles;
402          jcr->SDJobBytes = JobBytes;
403          jcr->SDErrors = JobErrors;
404          break;
405       }
406       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
407    }
408    if (n == BNET_HARDEOF) {
409       /*
410        * This probably should be M_FATAL, but I am not 100% sure
411        *  that this return *always* corresponds to a dropped line.
412        */
413       Qmsg(jcr, M_ERROR, 0, _("Director's comm line to SD dropped.\n"));
414    }
415    if (is_bnet_error(sd)) {
416       jcr->SDJobStatus = JS_ErrorTerminated;
417    }
418    pthread_cleanup_pop(1);            /* remove and execute the handler */
419    return NULL;
420 }
421
422 void wait_for_storage_daemon_termination(JCR *jcr)
423 {
424    int cancel_count = 0;
425    /* Now wait for Storage daemon to terminate our message thread */
426    while (!jcr->sd_msg_thread_done) {
427       struct timeval tv;
428       struct timezone tz;
429       struct timespec timeout;
430
431       gettimeofday(&tv, &tz);
432       timeout.tv_nsec = 0;
433       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
434       Dmsg0(400, "I'm waiting for message thread termination.\n");
435       P(mutex);
436       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
437       V(mutex);
438       if (jcr->is_canceled()) {
439          if (jcr->SD_msg_chan) {
440             jcr->store_bsock->set_timed_out();
441             jcr->store_bsock->set_terminated();
442             sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL);
443          }
444          cancel_count++;
445       }
446       /* Give SD 30 seconds to clean up after cancel */
447       if (cancel_count == 6) {
448          break;
449       }
450    }
451    jcr->setJobStatus(JS_Terminated);
452 }
453
454 /*
455  * Send bootstrap file to Storage daemon.
456  *  This is used for restore, verify VolumeToCatalog, migration,
457  *    and copy Jobs.
458  */
459 bool send_bootstrap_file(JCR *jcr, BSOCK *sd)
460 {
461    FILE *bs;
462    char buf[1000];
463    const char *bootstrap = "bootstrap\n";
464
465    Dmsg1(400, "send_bootstrap_file: %s\n", jcr->RestoreBootstrap);
466    if (!jcr->RestoreBootstrap) {
467       return true;
468    }
469    bs = fopen(jcr->RestoreBootstrap, "rb");
470    if (!bs) {
471       berrno be;
472       Jmsg(jcr, M_FATAL, 0, _("Could not open bootstrap file %s: ERR=%s\n"),
473          jcr->RestoreBootstrap, be.bstrerror());
474       jcr->setJobStatus(JS_ErrorTerminated);
475       return false;
476    }
477    sd->fsend(bootstrap);
478    while (fgets(buf, sizeof(buf), bs)) {
479       sd->fsend("%s", buf);
480    }
481    sd->signal(BNET_EOD);
482    fclose(bs);
483    if (jcr->unlink_bsr) {
484       unlink(jcr->RestoreBootstrap);
485       jcr->unlink_bsr = false;
486    }                         
487    return true;
488 }
489
490
491 #ifdef needed
492 #define MAX_TRIES 30
493 #define WAIT_TIME 2
494 extern "C" void *device_thread(void *arg)
495 {
496    int i;
497    JCR *jcr;
498    DEVICE *dev;
499
500
501    pthread_detach(pthread_self());
502    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
503    for (i=0; i < MAX_TRIES; i++) {
504       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
505          Dmsg0(900, "Failed connecting to SD.\n");
506          continue;
507       }
508       LockRes();
509       foreach_res(dev, R_DEVICE) {
510          if (!update_device_res(jcr, dev)) {
511             Dmsg1(900, "Error updating device=%s\n", dev->name());
512          } else {
513             Dmsg1(900, "Updated Device=%s\n", dev->name());
514          }
515       }
516       UnlockRes();
517       bnet_close(jcr->store_bsock);
518       jcr->store_bsock = NULL;
519       break;
520
521    }
522    free_jcr(jcr);
523    return NULL;
524 }
525
526 /*
527  * Start a thread to handle getting Device resource information
528  *  from SD. This is called once at startup of the Director.
529  */
530 void init_device_resources()
531 {
532    int status;
533    pthread_t thid;
534
535    Dmsg0(100, "Start Device thread.\n");
536    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
537       berrno be;
538       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
539    }
540 }
541 #endif