]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Update technotes and version
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Bacula® - The Network Backup Solution
20
21    Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
22
23    The main author of Bacula is Kern Sibbald, with contributions from
24    many others, a complete list can be found in the file AUTHORS.
25    This program is Free Software; you can redistribute it and/or
26    modify it under the terms of version two of the GNU General Public
27    License as published by the Free Software Foundation plus additions
28    that are listed in the file LICENSE.
29
30    This program is distributed in the hope that it will be useful, but
31    WITHOUT ANY WARRANTY; without even the implied warranty of
32    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33    General Public License for more details.
34
35    You should have received a copy of the GNU General Public License
36    along with this program; if not, write to the Free Software
37    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
38    02110-1301, USA.
39
40    Bacula® is a registered trademark of John Walker.
41    The licensor of Bacula is the Free Software Foundation Europe
42    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
43    Switzerland, email:ftf@fsfeurope.org.
44 */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81
82    if (jcr->store_bsock) {
83       return true;                    /* already connected */
84    }
85
86    /* If there is a write storage use it */
87    if (jcr->wstore) {
88       store = jcr->wstore;
89    } else {
90       store = jcr->rstore;
91    }
92
93    /*
94     *  Open message channel with the Storage daemon
95     */
96    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
97       store->SDport);
98    sd = bnet_connect(jcr, retry_interval, max_retry_time,
99           _("Storage daemon"), store->address,
100           NULL, store->SDport, verbose);
101    if (sd == NULL) {
102       return false;
103    }
104    sd->res = (RES *)store;        /* save pointer to other end */
105    jcr->store_bsock = sd;
106
107    if (!authenticate_storage_daemon(jcr, store)) {
108       bnet_close(sd);
109       jcr->store_bsock = NULL;
110       return false;
111    }
112    return true;
113 }
114
115 /*
116  * Here we ask the SD to send us the info for a 
117  *  particular device resource.
118  */
119 #ifdef needed
120 bool update_device_res(JCR *jcr, DEVICE *dev)
121 {
122    POOL_MEM device_name; 
123    BSOCK *sd;
124    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
125       return false;
126    }
127    sd = jcr->store_bsock;
128    pm_strcpy(device_name, dev->hdr.name);
129    bash_spaces(device_name);
130    bnet_fsend(sd, query_device, device_name.c_str());
131    Dmsg1(100, ">stored: %s\n", sd->msg);
132    /* The data is returned through Device_update */
133    if (bget_dirmsg(sd) <= 0) {
134       return false;
135    }
136    return true;
137 }
138 #endif
139
140 /*
141  * Start a job with the Storage daemon
142  */
143 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
144 {
145    bool ok = true;
146    STORE *storage;
147    BSOCK *sd;
148    char auth_key[100];
149    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
150    POOL_MEM job_name, client_name, fileset_name;
151    int copy = 0;
152    int stripe = 0;
153    char ed1[30];
154
155    sd = jcr->store_bsock;
156    /*
157     * Now send JobId and permissions, and get back the authorization key.
158     */
159    pm_strcpy(job_name, jcr->job->hdr.name);
160    bash_spaces(job_name);
161    pm_strcpy(client_name, jcr->client->hdr.name);
162    bash_spaces(client_name);
163    pm_strcpy(fileset_name, jcr->fileset->hdr.name);
164    bash_spaces(fileset_name);
165    if (jcr->fileset->MD5[0] == 0) {
166       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
167    }
168    /* If rescheduling, cancel the previous incarnation of this job
169     *  with the SD, which might be waiting on the FD connection.
170     *  If we do not cancel it the SD will not accept a new connection
171     *  for the same jobid.
172     */
173    if (jcr->reschedule_count) {
174       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
175       while (bnet_recv(sd) >= 0)
176          { }
177    } 
178    bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
179               job_name.c_str(), client_name.c_str(), 
180               jcr->JobType, jcr->JobLevel,
181               fileset_name.c_str(), !jcr->pool->catalog_files,
182               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
183               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
184    Dmsg1(100, ">stored: %s\n", sd->msg);
185    if (bget_dirmsg(sd) > 0) {
186        Dmsg1(100, "<stored: %s", sd->msg);
187        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
188                   &jcr->VolSessionTime, &auth_key) != 3) {
189           Dmsg1(100, "BadJob=%s\n", sd->msg);
190           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
191           return 0;
192        } else {
193           jcr->sd_auth_key = bstrdup(auth_key);
194           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
195        }
196    } else {
197       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
198          bnet_strerror(sd));
199       return 0;
200    }
201
202    /*
203     * We have two loops here. The first comes from the 
204     *  Storage = associated with the Job, and we need 
205     *  to attach to each one.
206     * The inner loop loops over all the alternative devices
207     *  associated with each Storage. It selects the first
208     *  available one.
209     *
210     */
211    /* Do read side of storage daemon */
212    if (ok && rstore) {
213       /* For the moment, only migrate has rpool */
214       if (jcr->JobType == JT_MIGRATE) {
215          pm_strcpy(pool_type, jcr->rpool->pool_type);
216          pm_strcpy(pool_name, jcr->rpool->name());
217       } else {
218          pm_strcpy(pool_type, jcr->pool->pool_type);
219          pm_strcpy(pool_name, jcr->pool->name());
220       }
221       bash_spaces(pool_type);
222       bash_spaces(pool_name);
223       foreach_alist(storage, rstore) {
224          Dmsg1(100, "Rstore=%s\n", storage->name());
225          bash_spaces(store_name);
226          pm_strcpy(media_type, storage->media_type);
227          bash_spaces(media_type);
228          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
229                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
230          Dmsg1(100, "rstore >stored: %s", sd->msg);
231          DEVICE *dev;
232          /* Loop over alternative storage Devices until one is OK */
233          foreach_alist(dev, storage->device) {
234             pm_strcpy(device_name, dev->hdr.name);
235             bash_spaces(device_name);
236             bnet_fsend(sd, use_device, device_name.c_str());
237             Dmsg1(100, ">stored: %s", sd->msg);
238          }
239          bnet_sig(sd, BNET_EOD);            /* end of Devices */
240       }
241       bnet_sig(sd, BNET_EOD);            /* end of Storages */
242       if (bget_dirmsg(sd) > 0) {
243          Dmsg1(100, "<stored: %s", sd->msg);
244          /* ****FIXME**** save actual device name */
245          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
246       } else {
247          ok = false;
248       }
249    }
250
251    /* Do write side of storage daemon */
252    if (ok && wstore) {
253       pm_strcpy(pool_type, jcr->pool->pool_type);
254       pm_strcpy(pool_name, jcr->pool->name());
255       bash_spaces(pool_type);
256       bash_spaces(pool_name);
257       foreach_alist(storage, wstore) {
258          pm_strcpy(store_name, storage->name());
259          bash_spaces(store_name);
260          pm_strcpy(media_type, storage->media_type);
261          bash_spaces(media_type);
262          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
263                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
264
265          Dmsg1(100, "wstore >stored: %s", sd->msg);
266          DEVICE *dev;
267          /* Loop over alternative storage Devices until one is OK */
268          foreach_alist(dev, storage->device) {
269             pm_strcpy(device_name, dev->hdr.name);
270             bash_spaces(device_name);
271             bnet_fsend(sd, use_device, device_name.c_str());
272             Dmsg1(100, ">stored: %s", sd->msg);
273          }
274          bnet_sig(sd, BNET_EOD);            /* end of Devices */
275       }
276       bnet_sig(sd, BNET_EOD);            /* end of Storages */
277       if (bget_dirmsg(sd) > 0) {
278          Dmsg1(100, "<stored: %s", sd->msg);
279          /* ****FIXME**** save actual device name */
280          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
281       } else {
282          ok = false;
283       }
284    }
285    if (!ok) {
286       POOL_MEM err_msg;
287       if (sd->msg[0]) {
288          pm_strcpy(err_msg, sd->msg); /* save message */
289          Jmsg(jcr, M_FATAL, 0, _("\n"
290               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
291               device_name.c_str(), err_msg.c_str()/* sd->msg */);
292       } else { 
293          Jmsg(jcr, M_FATAL, 0, _("\n"
294               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
295               device_name.c_str());
296       }
297    } else {
298       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
299    }
300    return ok;
301 }
302
303 /*
304  * Start a thread to handle Storage daemon messages and
305  *  Catalog requests.
306  */
307 bool start_storage_daemon_message_thread(JCR *jcr)
308 {
309    int status;
310    pthread_t thid;
311
312    jcr->inc_use_count();              /* mark in use by msg thread */
313    jcr->sd_msg_thread_done = false;
314    jcr->SD_msg_chan = 0;
315    Dmsg0(100, "Start SD msg_thread.\n");
316    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
317       berrno be;
318       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
319    }
320    /* Wait for thread to start */
321    while (jcr->SD_msg_chan == 0) {
322       bmicrosleep(0, 50);
323       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
324          return false;
325       }
326    }
327    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
328    return true;
329 }
330
331 extern "C" void msg_thread_cleanup(void *arg)
332 {
333    JCR *jcr = (JCR *)arg;
334    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
335    jcr->sd_msg_thread_done = true;
336    jcr->SD_msg_chan = 0;
337    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
338    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
339    free_jcr(jcr);                     /* release jcr */
340 }
341
342 /*
343  * Handle the message channel (i.e. requests from the
344  *  Storage daemon).
345  * Note, we are running in a separate thread.
346  */
347 extern "C" void *msg_thread(void *arg)
348 {
349    JCR *jcr = (JCR *)arg;
350    BSOCK *sd;
351    int JobStatus;
352    char Job[MAX_NAME_LENGTH];
353    uint32_t JobFiles;
354    uint64_t JobBytes;
355    int stat;
356
357    pthread_detach(pthread_self());
358    jcr->SD_msg_chan = pthread_self();
359    pthread_cleanup_push(msg_thread_cleanup, arg);
360    sd = jcr->store_bsock;
361
362    /* Read the Storage daemon's output.
363     */
364    Dmsg0(100, "Start msg_thread loop\n");
365    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
366       Dmsg1(400, "<stored: %s", sd->msg);
367       if (sscanf(sd->msg, Job_start, Job) == 1) {
368          continue;
369       }
370       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
371                  &JobBytes)) == 4) {
372          jcr->SDJobStatus = JobStatus; /* termination status */
373          jcr->SDJobFiles = JobFiles;
374          jcr->SDJobBytes = JobBytes;
375          break;
376       }
377       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
378    }
379    if (is_bnet_error(sd)) {
380       jcr->SDJobStatus = JS_ErrorTerminated;
381    }
382    pthread_cleanup_pop(1);            /* remove and execute the handler */
383    return NULL;
384 }
385
386 void wait_for_storage_daemon_termination(JCR *jcr)
387 {
388    int cancel_count = 0;
389    /* Now wait for Storage daemon to terminate our message thread */
390    while (!jcr->sd_msg_thread_done) {
391       struct timeval tv;
392       struct timezone tz;
393       struct timespec timeout;
394
395       gettimeofday(&tv, &tz);
396       timeout.tv_nsec = 0;
397       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
398       Dmsg0(400, "I'm waiting for message thread termination.\n");
399       P(mutex);
400       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
401       V(mutex);
402       if (job_canceled(jcr)) {
403          if (jcr->SD_msg_chan) {
404             jcr->store_bsock->timed_out = 1;
405             jcr->store_bsock->terminated = 1;
406             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
407             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
408          }
409          cancel_count++;
410       }
411       /* Give SD 30 seconds to clean up after cancel */
412       if (cancel_count == 6) {
413          break;
414       }
415    }
416    set_jcr_job_status(jcr, JS_Terminated);
417 }
418
419 #ifdef needed
420 #define MAX_TRIES 30
421 #define WAIT_TIME 2
422 extern "C" void *device_thread(void *arg)
423 {
424    int i;
425    JCR *jcr;
426    DEVICE *dev;
427
428
429    pthread_detach(pthread_self());
430    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
431    for (i=0; i < MAX_TRIES; i++) {
432       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
433          Dmsg0(900, "Failed connecting to SD.\n");
434          continue;
435       }
436       LockRes();
437       foreach_res(dev, R_DEVICE) {
438          if (!update_device_res(jcr, dev)) {
439             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
440          } else {
441             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
442          }
443       }
444       UnlockRes();
445       bnet_close(jcr->store_bsock);
446       jcr->store_bsock = NULL;
447       break;
448
449    }
450    free_jcr(jcr);
451    return NULL;
452 }
453
454 /*
455  * Start a thread to handle getting Device resource information
456  *  from SD. This is called once at startup of the Director.
457  */
458 void init_device_resources()
459 {
460    int status;
461    pthread_t thid;
462
463    Dmsg0(100, "Start Device thread.\n");
464    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
465       berrno be;
466       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
467    }
468 }
469 #endif