]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Fix bug #1307 AllowHigherDuplicates=no prevents automatic job escalation
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld JobErrors=%u\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd = new_bsock();
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd->set_source_address(director->DIRsrc_addr);
106    if (!sd->connect(jcr, retry_interval, max_retry_time, heart_beat, _("Storage daemon"),
107          store->address, NULL, store->SDport, verbose)) {
108       sd->destroy();
109       sd = NULL;
110    }
111
112    if (sd == NULL) {
113       return false;
114    }
115    sd->res = (RES *)store;        /* save pointer to other end */
116    jcr->store_bsock = sd;
117
118    if (!authenticate_storage_daemon(jcr, store)) {
119       sd->close();
120       jcr->store_bsock = NULL;
121       return false;
122    }
123    return true;
124 }
125
126 /*
127  * Here we ask the SD to send us the info for a 
128  *  particular device resource.
129  */
130 #ifdef xxx
131 bool update_device_res(JCR *jcr, DEVICE *dev)
132 {
133    POOL_MEM device_name; 
134    BSOCK *sd;
135    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
136       return false;
137    }
138    sd = jcr->store_bsock;
139    pm_strcpy(device_name, dev->name());
140    bash_spaces(device_name);
141    sd->fsend(query_device, device_name.c_str());
142    Dmsg1(100, ">stored: %s\n", sd->msg);
143    /* The data is returned through Device_update */
144    if (bget_dirmsg(sd) <= 0) {
145       return false;
146    }
147    return true;
148 }
149 #endif
150
151 static char OKbootstrap[] = "3000 OK bootstrap\n";
152
153 /*
154  * Start a job with the Storage daemon
155  */
156 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore, bool send_bsr)
157 {
158    bool ok = true;
159    STORE *storage;
160    BSOCK *sd;
161    char auth_key[100];
162    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
163    POOL_MEM job_name, client_name, fileset_name;
164    int copy = 0;
165    int stripe = 0;
166    char ed1[30], ed2[30];
167
168    sd = jcr->store_bsock;
169    /*
170     * Now send JobId and permissions, and get back the authorization key.
171     */
172    pm_strcpy(job_name, jcr->job->name());
173    bash_spaces(job_name);
174    pm_strcpy(client_name, jcr->client->name());
175    bash_spaces(client_name);
176    pm_strcpy(fileset_name, jcr->fileset->name());
177    bash_spaces(fileset_name);
178    if (jcr->fileset->MD5[0] == 0) {
179       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
180    }
181    /* If rescheduling, cancel the previous incarnation of this job
182     *  with the SD, which might be waiting on the FD connection.
183     *  If we do not cancel it the SD will not accept a new connection
184     *  for the same jobid.
185     */
186    if (jcr->reschedule_count) {
187       sd->fsend("cancel Job=%s\n", jcr->Job);
188       while (sd->recv() >= 0)
189          { }
190    } 
191    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
192              job_name.c_str(), client_name.c_str(), 
193              jcr->get_JobType(), jcr->get_JobLevel(),
194              fileset_name.c_str(), !jcr->pool->catalog_files,
195              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
196              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
197              edit_int64(jcr->spool_size, ed2));
198    Dmsg1(100, ">stored: %s", sd->msg);
199    if (bget_dirmsg(sd) > 0) {
200        Dmsg1(100, "<stored: %s", sd->msg);
201        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
202                   &jcr->VolSessionTime, &auth_key) != 3) {
203           Dmsg1(100, "BadJob=%s\n", sd->msg);
204           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
205           return false;
206        } else {
207           jcr->sd_auth_key = bstrdup(auth_key);
208           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
209        }
210    } else {
211       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
212          sd->bstrerror());
213       return false;
214    }
215
216    if (send_bsr && (!send_bootstrap_file(jcr, sd) ||
217        !response(jcr, sd, OKbootstrap, "Bootstrap", DISPLAY_ERROR))) {
218       return false;
219    }
220
221    /*
222     * We have two loops here. The first comes from the 
223     *  Storage = associated with the Job, and we need 
224     *  to attach to each one.
225     * The inner loop loops over all the alternative devices
226     *  associated with each Storage. It selects the first
227     *  available one.
228     *
229     */
230    /* Do read side of storage daemon */
231    if (ok && rstore) {
232       /* For the moment, only migrate, copy and vbackup have rpool */
233       if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
234            (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
235          pm_strcpy(pool_type, jcr->rpool->pool_type);
236          pm_strcpy(pool_name, jcr->rpool->name());
237       } else {
238          pm_strcpy(pool_type, jcr->pool->pool_type);
239          pm_strcpy(pool_name, jcr->pool->name());
240       }
241       bash_spaces(pool_type);
242       bash_spaces(pool_name);
243       foreach_alist(storage, rstore) {
244          Dmsg1(100, "Rstore=%s\n", storage->name());
245          pm_strcpy(store_name, storage->name());
246          bash_spaces(store_name);
247          pm_strcpy(media_type, storage->media_type);
248          bash_spaces(media_type);
249          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
250                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
251          Dmsg1(100, "rstore >stored: %s", sd->msg);
252          DEVICE *dev;
253          /* Loop over alternative storage Devices until one is OK */
254          foreach_alist(dev, storage->device) {
255             pm_strcpy(device_name, dev->name());
256             bash_spaces(device_name);
257             sd->fsend(use_device, device_name.c_str());
258             Dmsg1(100, ">stored: %s", sd->msg);
259          }
260          sd->signal(BNET_EOD);           /* end of Devices */
261       }
262       sd->signal(BNET_EOD);              /* end of Storages */
263       if (bget_dirmsg(sd) > 0) {
264          Dmsg1(100, "<stored: %s", sd->msg);
265          /* ****FIXME**** save actual device name */
266          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
267       } else {
268          ok = false;
269       }
270    }
271
272    /* Do write side of storage daemon */
273    if (ok && wstore) {
274       pm_strcpy(pool_type, jcr->pool->pool_type);
275       pm_strcpy(pool_name, jcr->pool->name());
276       bash_spaces(pool_type);
277       bash_spaces(pool_name);
278       foreach_alist(storage, wstore) {
279          pm_strcpy(store_name, storage->name());
280          bash_spaces(store_name);
281          pm_strcpy(media_type, storage->media_type);
282          bash_spaces(media_type);
283          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
284                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
285
286          Dmsg1(100, "wstore >stored: %s", sd->msg);
287          DEVICE *dev;
288          /* Loop over alternative storage Devices until one is OK */
289          foreach_alist(dev, storage->device) {
290             pm_strcpy(device_name, dev->name());
291             bash_spaces(device_name);
292             sd->fsend(use_device, device_name.c_str());
293             Dmsg1(100, ">stored: %s", sd->msg);
294          }
295          sd->signal(BNET_EOD);           /* end of Devices */
296       }
297       sd->signal(BNET_EOD);              /* end of Storages */
298       if (bget_dirmsg(sd) > 0) {
299          Dmsg1(100, "<stored: %s", sd->msg);
300          /* ****FIXME**** save actual device name */
301          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
302       } else {
303          ok = false;
304       }
305    }
306    if (!ok) {
307       POOL_MEM err_msg;
308       if (sd->msg[0]) {
309          pm_strcpy(err_msg, sd->msg); /* save message */
310          Jmsg(jcr, M_FATAL, 0, _("\n"
311               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
312               device_name.c_str(), err_msg.c_str()/* sd->msg */);
313       } else { 
314          Jmsg(jcr, M_FATAL, 0, _("\n"
315               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
316               device_name.c_str());
317       }
318    } else {
319       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
320    }
321    return ok;
322 }
323
324 /*
325  * Start a thread to handle Storage daemon messages and
326  *  Catalog requests.
327  */
328 bool start_storage_daemon_message_thread(JCR *jcr)
329 {
330    int status;
331    pthread_t thid;
332
333    jcr->inc_use_count();              /* mark in use by msg thread */
334    jcr->sd_msg_thread_done = false;
335    jcr->SD_msg_chan = 0;
336    Dmsg0(100, "Start SD msg_thread.\n");
337    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
338       berrno be;
339       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
340    }
341    /* Wait for thread to start */
342    while (jcr->SD_msg_chan == 0) {
343       bmicrosleep(0, 50);
344       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
345          return false;
346       }
347    }
348    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
349    return true;
350 }
351
352 extern "C" void msg_thread_cleanup(void *arg)
353 {
354    JCR *jcr = (JCR *)arg;
355    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
356    jcr->sd_msg_thread_done = true;
357    jcr->SD_msg_chan = 0;
358    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
359    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
360    free_jcr(jcr);                     /* release jcr */
361    db_thread_cleanup();               /* remove thread specific data */
362 }
363
364 /*
365  * Handle the message channel (i.e. requests from the
366  *  Storage daemon).
367  * Note, we are running in a separate thread.
368  */
369 extern "C" void *msg_thread(void *arg)
370 {
371    JCR *jcr = (JCR *)arg;
372    BSOCK *sd;
373    int JobStatus;
374    char Job[MAX_NAME_LENGTH];
375    uint32_t JobFiles, JobErrors;
376    uint64_t JobBytes;
377
378    pthread_detach(pthread_self());
379    set_jcr_in_tsd(jcr);
380    jcr->SD_msg_chan = pthread_self();
381    pthread_cleanup_push(msg_thread_cleanup, arg);
382    sd = jcr->store_bsock;
383
384    /* Read the Storage daemon's output.
385     */
386    Dmsg0(100, "Start msg_thread loop\n");
387    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
388       Dmsg1(400, "<stored: %s", sd->msg);
389       if (sscanf(sd->msg, Job_start, Job) == 1) {
390          continue;
391       }
392       if (sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
393                  &JobBytes, &JobErrors) == 5) {
394          jcr->SDJobStatus = JobStatus; /* termination status */
395          jcr->SDJobFiles = JobFiles;
396          jcr->SDJobBytes = JobBytes;
397          jcr->SDErrors = JobErrors;
398          break;
399       }
400       Dmsg1(400, "end loop use=%d\n", jcr->use_count());
401    }
402    if (is_bnet_error(sd)) {
403       jcr->SDJobStatus = JS_ErrorTerminated;
404    }
405    pthread_cleanup_pop(1);            /* remove and execute the handler */
406    return NULL;
407 }
408
409 void wait_for_storage_daemon_termination(JCR *jcr)
410 {
411    int cancel_count = 0;
412    /* Now wait for Storage daemon to terminate our message thread */
413    while (!jcr->sd_msg_thread_done) {
414       struct timeval tv;
415       struct timezone tz;
416       struct timespec timeout;
417
418       gettimeofday(&tv, &tz);
419       timeout.tv_nsec = 0;
420       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
421       Dmsg0(400, "I'm waiting for message thread termination.\n");
422       P(mutex);
423       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
424       V(mutex);
425       if (job_canceled(jcr)) {
426          if (jcr->SD_msg_chan) {
427             jcr->store_bsock->set_timed_out();
428             jcr->store_bsock->set_terminated();
429             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
430             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
431          }
432          cancel_count++;
433       }
434       /* Give SD 30 seconds to clean up after cancel */
435       if (cancel_count == 6) {
436          break;
437       }
438    }
439    set_jcr_job_status(jcr, JS_Terminated);
440 }
441
442 #ifdef needed
443 #define MAX_TRIES 30
444 #define WAIT_TIME 2
445 extern "C" void *device_thread(void *arg)
446 {
447    int i;
448    JCR *jcr;
449    DEVICE *dev;
450
451
452    pthread_detach(pthread_self());
453    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
454    for (i=0; i < MAX_TRIES; i++) {
455       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
456          Dmsg0(900, "Failed connecting to SD.\n");
457          continue;
458       }
459       LockRes();
460       foreach_res(dev, R_DEVICE) {
461          if (!update_device_res(jcr, dev)) {
462             Dmsg1(900, "Error updating device=%s\n", dev->name());
463          } else {
464             Dmsg1(900, "Updated Device=%s\n", dev->name());
465          }
466       }
467       UnlockRes();
468       bnet_close(jcr->store_bsock);
469       jcr->store_bsock = NULL;
470       break;
471
472    }
473    free_jcr(jcr);
474    return NULL;
475 }
476
477 /*
478  * Start a thread to handle getting Device resource information
479  *  from SD. This is called once at startup of the Director.
480  */
481 void init_device_resources()
482 {
483    int status;
484    pthread_t thid;
485
486    Dmsg0(100, "Start Device thread.\n");
487    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
488       berrno be;
489       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
490    }
491 }
492 #endif