]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
kes Add code to help Dan debug 2drive-incremental-2disk test.
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106           _("Storage daemon"), store->address,
107           NULL, store->SDport, verbose);
108    if (sd == NULL) {
109       return false;
110    }
111    sd->res = (RES *)store;        /* save pointer to other end */
112    jcr->store_bsock = sd;
113
114    if (!authenticate_storage_daemon(jcr, store)) {
115       sd->close();
116       jcr->store_bsock = NULL;
117       return false;
118    }
119    return true;
120 }
121
122 /*
123  * Here we ask the SD to send us the info for a 
124  *  particular device resource.
125  */
126 #ifdef needed
127 bool update_device_res(JCR *jcr, DEVICE *dev)
128 {
129    POOL_MEM device_name; 
130    BSOCK *sd;
131    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
132       return false;
133    }
134    sd = jcr->store_bsock;
135    pm_strcpy(device_name, dev->name());
136    bash_spaces(device_name);
137    sd->fsend(query_device, device_name.c_str());
138    Dmsg1(100, ">stored: %s\n", sd->msg);
139    /* The data is returned through Device_update */
140    if (bget_dirmsg(sd) <= 0) {
141       return false;
142    }
143    return true;
144 }
145 #endif
146
147 /*
148  * Start a job with the Storage daemon
149  */
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
151 {
152    bool ok = true;
153    STORE *storage;
154    BSOCK *sd;
155    char auth_key[100];
156    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157    POOL_MEM job_name, client_name, fileset_name;
158    int copy = 0;
159    int stripe = 0;
160    char ed1[30];
161
162    sd = jcr->store_bsock;
163    /*
164     * Now send JobId and permissions, and get back the authorization key.
165     */
166    pm_strcpy(job_name, jcr->job->name());
167    bash_spaces(job_name);
168    pm_strcpy(client_name, jcr->client->name());
169    bash_spaces(client_name);
170    pm_strcpy(fileset_name, jcr->fileset->name());
171    bash_spaces(fileset_name);
172    if (jcr->fileset->MD5[0] == 0) {
173       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
174    }
175    /* If rescheduling, cancel the previous incarnation of this job
176     *  with the SD, which might be waiting on the FD connection.
177     *  If we do not cancel it the SD will not accept a new connection
178     *  for the same jobid.
179     */
180    if (jcr->reschedule_count) {
181       sd->fsend("cancel Job=%s\n", jcr->Job);
182       while (sd->recv() >= 0)
183          { }
184    } 
185    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
186              job_name.c_str(), client_name.c_str(), 
187              jcr->JobType, jcr->JobLevel,
188              fileset_name.c_str(), !jcr->pool->catalog_files,
189              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
190              jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
191    Dmsg1(100, ">stored: %s\n", sd->msg);
192    if (bget_dirmsg(sd) > 0) {
193        Dmsg1(100, "<stored: %s", sd->msg);
194        if (debug_level == 3) {
195           Dmsg1(000, "<stored: %s", sd->msg);
196        }
197        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
198                   &jcr->VolSessionTime, &auth_key) != 3) {
199           Dmsg1(100, "BadJob=%s\n", sd->msg);
200           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
201           return 0;
202        } else {
203           jcr->sd_auth_key = bstrdup(auth_key);
204           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
205        }
206    } else {
207       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
208          sd->bstrerror());
209       return 0;
210    }
211
212    /*
213     * We have two loops here. The first comes from the 
214     *  Storage = associated with the Job, and we need 
215     *  to attach to each one.
216     * The inner loop loops over all the alternative devices
217     *  associated with each Storage. It selects the first
218     *  available one.
219     *
220     */
221    /* Do read side of storage daemon */
222    if (ok && rstore) {
223       /* For the moment, only migrate has rpool */
224       if (jcr->JobType == JT_MIGRATE) {
225          pm_strcpy(pool_type, jcr->rpool->pool_type);
226          pm_strcpy(pool_name, jcr->rpool->name());
227       } else {
228          pm_strcpy(pool_type, jcr->pool->pool_type);
229          pm_strcpy(pool_name, jcr->pool->name());
230       }
231       bash_spaces(pool_type);
232       bash_spaces(pool_name);
233       foreach_alist(storage, rstore) {
234          Dmsg1(100, "Rstore=%s\n", storage->name());
235          bash_spaces(store_name);
236          pm_strcpy(media_type, storage->media_type);
237          bash_spaces(media_type);
238          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
239                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
240          Dmsg1(100, "rstore >stored: %s", sd->msg);
241          DEVICE *dev;
242          /* Loop over alternative storage Devices until one is OK */
243          foreach_alist(dev, storage->device) {
244             pm_strcpy(device_name, dev->name());
245             bash_spaces(device_name);
246             sd->fsend(use_device, device_name.c_str());
247             Dmsg1(100, ">stored: %s", sd->msg);
248          }
249          sd->signal(BNET_EOD);           /* end of Devices */
250       }
251       sd->signal(BNET_EOD);              /* end of Storages */
252       if (bget_dirmsg(sd) > 0) {
253          Dmsg1(100, "<stored: %s", sd->msg);
254          /* ****FIXME**** save actual device name */
255          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
256       } else {
257          ok = false;
258       }
259    }
260
261    /* Do write side of storage daemon */
262    if (ok && wstore) {
263       pm_strcpy(pool_type, jcr->pool->pool_type);
264       pm_strcpy(pool_name, jcr->pool->name());
265       bash_spaces(pool_type);
266       bash_spaces(pool_name);
267       foreach_alist(storage, wstore) {
268          pm_strcpy(store_name, storage->name());
269          bash_spaces(store_name);
270          pm_strcpy(media_type, storage->media_type);
271          bash_spaces(media_type);
272          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
273                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
274
275          Dmsg1(100, "wstore >stored: %s", sd->msg);
276          DEVICE *dev;
277          /* Loop over alternative storage Devices until one is OK */
278          foreach_alist(dev, storage->device) {
279             pm_strcpy(device_name, dev->name());
280             bash_spaces(device_name);
281             sd->fsend(use_device, device_name.c_str());
282             Dmsg1(100, ">stored: %s", sd->msg);
283          }
284          sd->signal(BNET_EOD);           /* end of Devices */
285       }
286       sd->signal(BNET_EOD);              /* end of Storages */
287       if (bget_dirmsg(sd) > 0) {
288          Dmsg1(100, "<stored: %s", sd->msg);
289          /* ****FIXME**** save actual device name */
290          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
291       } else {
292          ok = false;
293       }
294    }
295    if (!ok) {
296       POOL_MEM err_msg;
297       if (sd->msg[0]) {
298          pm_strcpy(err_msg, sd->msg); /* save message */
299          Jmsg(jcr, M_FATAL, 0, _("\n"
300               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
301               device_name.c_str(), err_msg.c_str()/* sd->msg */);
302       } else { 
303          Jmsg(jcr, M_FATAL, 0, _("\n"
304               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
305               device_name.c_str());
306       }
307    } else {
308       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
309    }
310    return ok;
311 }
312
313 /*
314  * Start a thread to handle Storage daemon messages and
315  *  Catalog requests.
316  */
317 bool start_storage_daemon_message_thread(JCR *jcr)
318 {
319    int status;
320    pthread_t thid;
321
322    jcr->inc_use_count();              /* mark in use by msg thread */
323    jcr->sd_msg_thread_done = false;
324    jcr->SD_msg_chan = 0;
325    Dmsg0(100, "Start SD msg_thread.\n");
326    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
327       berrno be;
328       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
329    }
330    /* Wait for thread to start */
331    while (jcr->SD_msg_chan == 0) {
332       bmicrosleep(0, 50);
333       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
334          return false;
335       }
336    }
337    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
338    return true;
339 }
340
341 extern "C" void msg_thread_cleanup(void *arg)
342 {
343    JCR *jcr = (JCR *)arg;
344    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
345    jcr->sd_msg_thread_done = true;
346    jcr->SD_msg_chan = 0;
347    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
348    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
349    free_jcr(jcr);                     /* release jcr */
350    db_thread_cleanup();               /* remove thread specific data */
351 }
352
353 /*
354  * Handle the message channel (i.e. requests from the
355  *  Storage daemon).
356  * Note, we are running in a separate thread.
357  */
358 extern "C" void *msg_thread(void *arg)
359 {
360    JCR *jcr = (JCR *)arg;
361    BSOCK *sd;
362    int JobStatus;
363    char Job[MAX_NAME_LENGTH];
364    uint32_t JobFiles;
365    uint64_t JobBytes;
366    int stat;
367
368    pthread_detach(pthread_self());
369    jcr->SD_msg_chan = pthread_self();
370    pthread_cleanup_push(msg_thread_cleanup, arg);
371    sd = jcr->store_bsock;
372
373    /* Read the Storage daemon's output.
374     */
375    Dmsg0(100, "Start msg_thread loop\n");
376    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
377       Dmsg1(400, "<stored: %s", sd->msg);
378       if (sscanf(sd->msg, Job_start, Job) == 1) {
379          continue;
380       }
381       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
382                  &JobBytes)) == 4) {
383          jcr->SDJobStatus = JobStatus; /* termination status */
384          jcr->SDJobFiles = JobFiles;
385          jcr->SDJobBytes = JobBytes;
386          break;
387       }
388       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
389    }
390    if (is_bnet_error(sd)) {
391       jcr->SDJobStatus = JS_ErrorTerminated;
392    }
393    pthread_cleanup_pop(1);            /* remove and execute the handler */
394    return NULL;
395 }
396
397 void wait_for_storage_daemon_termination(JCR *jcr)
398 {
399    int cancel_count = 0;
400    /* Now wait for Storage daemon to terminate our message thread */
401    while (!jcr->sd_msg_thread_done) {
402       struct timeval tv;
403       struct timezone tz;
404       struct timespec timeout;
405
406       gettimeofday(&tv, &tz);
407       timeout.tv_nsec = 0;
408       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
409       Dmsg0(400, "I'm waiting for message thread termination.\n");
410       P(mutex);
411       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
412       V(mutex);
413       if (job_canceled(jcr)) {
414          if (jcr->SD_msg_chan) {
415             jcr->store_bsock->set_timed_out();
416             jcr->store_bsock->set_terminated();
417             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
418             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
419          }
420          cancel_count++;
421       }
422       /* Give SD 30 seconds to clean up after cancel */
423       if (cancel_count == 6) {
424          break;
425       }
426    }
427    set_jcr_job_status(jcr, JS_Terminated);
428 }
429
430 #ifdef needed
431 #define MAX_TRIES 30
432 #define WAIT_TIME 2
433 extern "C" void *device_thread(void *arg)
434 {
435    int i;
436    JCR *jcr;
437    DEVICE *dev;
438
439
440    pthread_detach(pthread_self());
441    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
442    for (i=0; i < MAX_TRIES; i++) {
443       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
444          Dmsg0(900, "Failed connecting to SD.\n");
445          continue;
446       }
447       LockRes();
448       foreach_res(dev, R_DEVICE) {
449          if (!update_device_res(jcr, dev)) {
450             Dmsg1(900, "Error updating device=%s\n", dev->name());
451          } else {
452             Dmsg1(900, "Updated Device=%s\n", dev->name());
453          }
454       }
455       UnlockRes();
456       bnet_close(jcr->store_bsock);
457       jcr->store_bsock = NULL;
458       break;
459
460    }
461    free_jcr(jcr);
462    return NULL;
463 }
464
465 /*
466  * Start a thread to handle getting Device resource information
467  *  from SD. This is called once at startup of the Director.
468  */
469 void init_device_resources()
470 {
471    int status;
472    pthread_t thid;
473
474    Dmsg0(100, "Start Device thread.\n");
475    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
476       berrno be;
477       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
478    }
479 }
480 #endif