]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Tweak debug
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of Kern Sibbald.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d SpoolSize=%s\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106           _("Storage daemon"), store->address,
107           NULL, store->SDport, verbose);
108    if (sd == NULL) {
109       return false;
110    }
111    sd->res = (RES *)store;        /* save pointer to other end */
112    jcr->store_bsock = sd;
113
114    if (!authenticate_storage_daemon(jcr, store)) {
115       sd->close();
116       jcr->store_bsock = NULL;
117       return false;
118    }
119    return true;
120 }
121
122 /*
123  * Here we ask the SD to send us the info for a 
124  *  particular device resource.
125  */
126 #ifdef needed
127 bool update_device_res(JCR *jcr, DEVICE *dev)
128 {
129    POOL_MEM device_name; 
130    BSOCK *sd;
131    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
132       return false;
133    }
134    sd = jcr->store_bsock;
135    pm_strcpy(device_name, dev->name());
136    bash_spaces(device_name);
137    sd->fsend(query_device, device_name.c_str());
138    Dmsg1(100, ">stored: %s\n", sd->msg);
139    /* The data is returned through Device_update */
140    if (bget_dirmsg(sd) <= 0) {
141       return false;
142    }
143    return true;
144 }
145 #endif
146
147 /*
148  * Start a job with the Storage daemon
149  */
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
151 {
152    bool ok = true;
153    STORE *storage;
154    BSOCK *sd;
155    char auth_key[100];
156    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157    POOL_MEM job_name, client_name, fileset_name;
158    int copy = 0;
159    int stripe = 0;
160    char ed1[30], ed2[30];
161
162    sd = jcr->store_bsock;
163    /*
164     * Now send JobId and permissions, and get back the authorization key.
165     */
166    pm_strcpy(job_name, jcr->job->name());
167    bash_spaces(job_name);
168    pm_strcpy(client_name, jcr->client->name());
169    bash_spaces(client_name);
170    pm_strcpy(fileset_name, jcr->fileset->name());
171    bash_spaces(fileset_name);
172    if (jcr->fileset->MD5[0] == 0) {
173       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
174    }
175    /* If rescheduling, cancel the previous incarnation of this job
176     *  with the SD, which might be waiting on the FD connection.
177     *  If we do not cancel it the SD will not accept a new connection
178     *  for the same jobid.
179     */
180    if (jcr->reschedule_count) {
181       sd->fsend("cancel Job=%s\n", jcr->Job);
182       while (sd->recv() >= 0)
183          { }
184    } 
185    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
186              job_name.c_str(), client_name.c_str(), 
187              jcr->get_JobType(), jcr->get_JobLevel(),
188              fileset_name.c_str(), !jcr->pool->catalog_files,
189              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
190              jcr->write_part_after_job, jcr->job->PreferMountedVolumes,
191              edit_int64(jcr->spool_size, ed2));
192    Dmsg1(100, ">stored: %s\n", sd->msg);
193    if (bget_dirmsg(sd) > 0) {
194        Dmsg1(100, "<stored: %s", sd->msg);
195        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
196                   &jcr->VolSessionTime, &auth_key) != 3) {
197           Dmsg1(100, "BadJob=%s\n", sd->msg);
198           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
199           return 0;
200        } else {
201           jcr->sd_auth_key = bstrdup(auth_key);
202           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
203        }
204    } else {
205       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
206          sd->bstrerror());
207       return 0;
208    }
209
210    /*
211     * We have two loops here. The first comes from the 
212     *  Storage = associated with the Job, and we need 
213     *  to attach to each one.
214     * The inner loop loops over all the alternative devices
215     *  associated with each Storage. It selects the first
216     *  available one.
217     *
218     */
219    /* Do read side of storage daemon */
220    if (ok && rstore) {
221       /* For the moment, only migrate, copy and vbackup have rpool */
222       if (jcr->get_JobType() == JT_MIGRATE || jcr->get_JobType() == JT_COPY ||
223            (jcr->get_JobType() == JT_BACKUP && jcr->get_JobLevel() == L_VIRTUAL_FULL)) {
224          pm_strcpy(pool_type, jcr->rpool->pool_type);
225          pm_strcpy(pool_name, jcr->rpool->name());
226       } else {
227          pm_strcpy(pool_type, jcr->pool->pool_type);
228          pm_strcpy(pool_name, jcr->pool->name());
229       }
230       bash_spaces(pool_type);
231       bash_spaces(pool_name);
232       foreach_alist(storage, rstore) {
233          Dmsg1(100, "Rstore=%s\n", storage->name());
234          pm_strcpy(store_name, storage->name());
235          bash_spaces(store_name);
236          pm_strcpy(media_type, storage->media_type);
237          bash_spaces(media_type);
238          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
239                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
240          Dmsg1(100, "rstore >stored: %s", sd->msg);
241          DEVICE *dev;
242          /* Loop over alternative storage Devices until one is OK */
243          foreach_alist(dev, storage->device) {
244             pm_strcpy(device_name, dev->name());
245             bash_spaces(device_name);
246             sd->fsend(use_device, device_name.c_str());
247             Dmsg1(100, ">stored: %s", sd->msg);
248          }
249          sd->signal(BNET_EOD);           /* end of Devices */
250       }
251       sd->signal(BNET_EOD);              /* end of Storages */
252       if (bget_dirmsg(sd) > 0) {
253          Dmsg1(100, "<stored: %s", sd->msg);
254          /* ****FIXME**** save actual device name */
255          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
256       } else {
257          ok = false;
258       }
259    }
260
261    /* Do write side of storage daemon */
262    if (ok && wstore) {
263       pm_strcpy(pool_type, jcr->pool->pool_type);
264       pm_strcpy(pool_name, jcr->pool->name());
265       bash_spaces(pool_type);
266       bash_spaces(pool_name);
267       foreach_alist(storage, wstore) {
268          pm_strcpy(store_name, storage->name());
269          bash_spaces(store_name);
270          pm_strcpy(media_type, storage->media_type);
271          bash_spaces(media_type);
272          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
273                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
274
275          Dmsg1(100, "wstore >stored: %s", sd->msg);
276          DEVICE *dev;
277          /* Loop over alternative storage Devices until one is OK */
278          foreach_alist(dev, storage->device) {
279             pm_strcpy(device_name, dev->name());
280             bash_spaces(device_name);
281             sd->fsend(use_device, device_name.c_str());
282             Dmsg1(100, ">stored: %s", sd->msg);
283          }
284          sd->signal(BNET_EOD);           /* end of Devices */
285       }
286       sd->signal(BNET_EOD);              /* end of Storages */
287       if (bget_dirmsg(sd) > 0) {
288          Dmsg1(100, "<stored: %s", sd->msg);
289          /* ****FIXME**** save actual device name */
290          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
291       } else {
292          ok = false;
293       }
294    }
295    if (!ok) {
296       POOL_MEM err_msg;
297       if (sd->msg[0]) {
298          pm_strcpy(err_msg, sd->msg); /* save message */
299          Jmsg(jcr, M_FATAL, 0, _("\n"
300               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
301               device_name.c_str(), err_msg.c_str()/* sd->msg */);
302       } else { 
303          Jmsg(jcr, M_FATAL, 0, _("\n"
304               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
305               device_name.c_str());
306       }
307    } else {
308       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
309    }
310    return ok;
311 }
312
313 /*
314  * Start a thread to handle Storage daemon messages and
315  *  Catalog requests.
316  */
317 bool start_storage_daemon_message_thread(JCR *jcr)
318 {
319    int status;
320    pthread_t thid;
321
322    jcr->inc_use_count();              /* mark in use by msg thread */
323    jcr->sd_msg_thread_done = false;
324    jcr->SD_msg_chan = 0;
325    Dmsg0(100, "Start SD msg_thread.\n");
326    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
327       berrno be;
328       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
329    }
330    /* Wait for thread to start */
331    while (jcr->SD_msg_chan == 0) {
332       bmicrosleep(0, 50);
333       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
334          return false;
335       }
336    }
337    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
338    return true;
339 }
340
341 extern "C" void msg_thread_cleanup(void *arg)
342 {
343    JCR *jcr = (JCR *)arg;
344    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
345    jcr->sd_msg_thread_done = true;
346    jcr->SD_msg_chan = 0;
347    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
348    Dmsg2(100, "=== End msg_thread. JobId=%d usecnt=%d\n", jcr->JobId, jcr->use_count());
349    free_jcr(jcr);                     /* release jcr */
350    db_thread_cleanup();               /* remove thread specific data */
351 }
352
353 /*
354  * Handle the message channel (i.e. requests from the
355  *  Storage daemon).
356  * Note, we are running in a separate thread.
357  */
358 extern "C" void *msg_thread(void *arg)
359 {
360    JCR *jcr = (JCR *)arg;
361    BSOCK *sd;
362    int JobStatus;
363    char Job[MAX_NAME_LENGTH];
364    uint32_t JobFiles;
365    uint64_t JobBytes;
366    int stat;
367
368    pthread_detach(pthread_self());
369    set_jcr_in_tsd(jcr);
370    jcr->SD_msg_chan = pthread_self();
371    pthread_cleanup_push(msg_thread_cleanup, arg);
372    sd = jcr->store_bsock;
373
374    /* Read the Storage daemon's output.
375     */
376    Dmsg0(100, "Start msg_thread loop\n");
377    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
378       Dmsg1(400, "<stored: %s", sd->msg);
379       if (sscanf(sd->msg, Job_start, Job) == 1) {
380          continue;
381       }
382       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
383                  &JobBytes)) == 4) {
384          jcr->SDJobStatus = JobStatus; /* termination status */
385          jcr->SDJobFiles = JobFiles;
386          jcr->SDJobBytes = JobBytes;
387          break;
388       }
389       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
390    }
391    if (is_bnet_error(sd)) {
392       jcr->SDJobStatus = JS_ErrorTerminated;
393    }
394    pthread_cleanup_pop(1);            /* remove and execute the handler */
395    return NULL;
396 }
397
398 void wait_for_storage_daemon_termination(JCR *jcr)
399 {
400    int cancel_count = 0;
401    /* Now wait for Storage daemon to terminate our message thread */
402    while (!jcr->sd_msg_thread_done) {
403       struct timeval tv;
404       struct timezone tz;
405       struct timespec timeout;
406
407       gettimeofday(&tv, &tz);
408       timeout.tv_nsec = 0;
409       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
410       Dmsg0(400, "I'm waiting for message thread termination.\n");
411       P(mutex);
412       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
413       V(mutex);
414       if (job_canceled(jcr)) {
415          if (jcr->SD_msg_chan) {
416             jcr->store_bsock->set_timed_out();
417             jcr->store_bsock->set_terminated();
418             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
419             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
420          }
421          cancel_count++;
422       }
423       /* Give SD 30 seconds to clean up after cancel */
424       if (cancel_count == 6) {
425          break;
426       }
427    }
428    set_jcr_job_status(jcr, JS_Terminated);
429 }
430
431 #ifdef needed
432 #define MAX_TRIES 30
433 #define WAIT_TIME 2
434 extern "C" void *device_thread(void *arg)
435 {
436    int i;
437    JCR *jcr;
438    DEVICE *dev;
439
440
441    pthread_detach(pthread_self());
442    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
443    for (i=0; i < MAX_TRIES; i++) {
444       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
445          Dmsg0(900, "Failed connecting to SD.\n");
446          continue;
447       }
448       LockRes();
449       foreach_res(dev, R_DEVICE) {
450          if (!update_device_res(jcr, dev)) {
451             Dmsg1(900, "Error updating device=%s\n", dev->name());
452          } else {
453             Dmsg1(900, "Updated Device=%s\n", dev->name());
454          }
455       }
456       UnlockRes();
457       bnet_close(jcr->store_bsock);
458       jcr->store_bsock = NULL;
459       break;
460
461    }
462    free_jcr(jcr);
463    return NULL;
464 }
465
466 /*
467  * Start a thread to handle getting Device resource information
468  *  from SD. This is called once at startup of the Director.
469  */
470 void init_device_resources()
471 {
472    int status;
473    pthread_t thid;
474
475    Dmsg0(100, "Start Device thread.\n");
476    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
477       berrno be;
478       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
479    }
480 }
481 #endif