]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Update copyright
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Bacula® - The Network Backup Solution
20
21    Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
22
23    The main author of Bacula is Kern Sibbald, with contributions from
24    many others, a complete list can be found in the file AUTHORS.
25    This program is Free Software; you can redistribute it and/or
26    modify it under the terms of version two of the GNU General Public
27    License as published by the Free Software Foundation plus additions
28    that are listed in the file LICENSE.
29
30    This program is distributed in the hope that it will be useful, but
31    WITHOUT ANY WARRANTY; without even the implied warranty of
32    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33    General Public License for more details.
34
35    You should have received a copy of the GNU General Public License
36    along with this program; if not, write to the Free Software
37    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
38    02110-1301, USA.
39
40    Bacula® is a registered trademark of John Walker.
41    The licensor of Bacula is the Free Software Foundation Europe
42    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
43    Switzerland, email:ftf@fsfeurope.org.
44 */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81
82    if (jcr->store_bsock) {
83       return true;                    /* already connected */
84    }
85
86    /* If there is a write storage use it */
87    if (jcr->wstorage) {
88       store = (STORE *)jcr->wstorage->first();
89    } else {
90       store = (STORE *)jcr->rstorage->first();
91    }
92
93    /*
94     *  Open message channel with the Storage daemon
95     */
96    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
97       store->SDport);
98    sd = bnet_connect(jcr, retry_interval, max_retry_time,
99           _("Storage daemon"), store->address,
100           NULL, store->SDport, verbose);
101    if (sd == NULL) {
102       return false;
103    }
104    sd->res = (RES *)store;        /* save pointer to other end */
105    jcr->store_bsock = sd;
106
107    if (!authenticate_storage_daemon(jcr, store)) {
108       bnet_close(sd);
109       jcr->store_bsock = NULL;
110       return false;
111    }
112    return true;
113 }
114
115 /*
116  * Here we ask the SD to send us the info for a 
117  *  particular device resource.
118  */
119 #ifdef needed
120 bool update_device_res(JCR *jcr, DEVICE *dev)
121 {
122    POOL_MEM device_name; 
123    BSOCK *sd;
124    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
125       return false;
126    }
127    sd = jcr->store_bsock;
128    pm_strcpy(device_name, dev->hdr.name);
129    bash_spaces(device_name);
130    bnet_fsend(sd, query_device, device_name.c_str());
131    Dmsg1(100, ">stored: %s\n", sd->msg);
132    /* The data is returned through Device_update */
133    if (bget_dirmsg(sd) <= 0) {
134       return false;
135    }
136    return true;
137 }
138 #endif
139
140 /*
141  * Start a job with the Storage daemon
142  */
143 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
144 {
145    bool ok = true;
146    STORE *storage;
147    BSOCK *sd;
148    char auth_key[100];
149    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
150    POOL_MEM job_name, client_name, fileset_name;
151    int copy = 0;
152    int stripe = 0;
153    char ed1[30];
154
155    sd = jcr->store_bsock;
156    /*
157     * Now send JobId and permissions, and get back the authorization key.
158     */
159    pm_strcpy(job_name, jcr->job->hdr.name);
160    bash_spaces(job_name);
161    pm_strcpy(client_name, jcr->client->hdr.name);
162    bash_spaces(client_name);
163    pm_strcpy(fileset_name, jcr->fileset->hdr.name);
164    bash_spaces(fileset_name);
165    if (jcr->fileset->MD5[0] == 0) {
166       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
167    }
168    /* If rescheduling, cancel the previous incarnation of this job
169     *  with the SD, which might be waiting on the FD connection.
170     *  If we do not cancel it the SD will not accept a new connection
171     *  for the same jobid.
172     */
173    if (jcr->reschedule_count) {
174       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
175       while (bnet_recv(sd) >= 0)
176          { }
177    } 
178    bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
179               job_name.c_str(), client_name.c_str(), 
180               jcr->JobType, jcr->JobLevel,
181               fileset_name.c_str(), !jcr->pool->catalog_files,
182               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
183               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
184    Dmsg1(100, ">stored: %s\n", sd->msg);
185    if (bget_dirmsg(sd) > 0) {
186        Dmsg1(100, "<stored: %s", sd->msg);
187        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
188                   &jcr->VolSessionTime, &auth_key) != 3) {
189           Dmsg1(100, "BadJob=%s\n", sd->msg);
190           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
191           return 0;
192        } else {
193           jcr->sd_auth_key = bstrdup(auth_key);
194           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
195        }
196    } else {
197       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
198          bnet_strerror(sd));
199       return 0;
200    }
201
202    pm_strcpy(pool_type, jcr->pool->pool_type);
203    pm_strcpy(pool_name, jcr->pool->hdr.name);
204    bash_spaces(pool_type);
205    bash_spaces(pool_name);
206
207    /*
208     * We have two loops here. The first comes from the 
209     *  Storage = associated with the Job, and we need 
210     *  to attach to each one.
211     * The inner loop loops over all the alternative devices
212     *  associated with each Storage. It selects the first
213     *  available one.
214     *
215     */
216    /* Do read side of storage daemon */
217    if (ok && rstore) {
218       foreach_alist(storage, rstore) {
219          pm_strcpy(store_name, storage->hdr.name);
220          bash_spaces(store_name);
221          pm_strcpy(media_type, storage->media_type);
222          bash_spaces(media_type);
223          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
224                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
225
226          DEVICE *dev;
227          /* Loop over alternative storage Devices until one is OK */
228          foreach_alist(dev, storage->device) {
229             pm_strcpy(device_name, dev->hdr.name);
230             bash_spaces(device_name);
231             bnet_fsend(sd, use_device, device_name.c_str());
232             Dmsg1(100, ">stored: %s", sd->msg);
233          }
234          bnet_sig(sd, BNET_EOD);            /* end of Devices */
235       }
236       bnet_sig(sd, BNET_EOD);            /* end of Storages */
237       if (bget_dirmsg(sd) > 0) {
238          Dmsg1(100, "<stored: %s", sd->msg);
239          /* ****FIXME**** save actual device name */
240          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
241       } else {
242          ok = false;
243       }
244    }
245
246    /* Do write side of storage daemon */
247    if (ok && wstore) {
248       foreach_alist(storage, wstore) {
249          pm_strcpy(store_name, storage->hdr.name);
250          bash_spaces(store_name);
251          pm_strcpy(media_type, storage->media_type);
252          bash_spaces(media_type);
253          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
254                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
255
256          DEVICE *dev;
257          /* Loop over alternative storage Devices until one is OK */
258          foreach_alist(dev, storage->device) {
259             pm_strcpy(device_name, dev->hdr.name);
260             bash_spaces(device_name);
261             bnet_fsend(sd, use_device, device_name.c_str());
262             Dmsg1(100, ">stored: %s", sd->msg);
263          }
264          bnet_sig(sd, BNET_EOD);            /* end of Devices */
265       }
266       bnet_sig(sd, BNET_EOD);            /* end of Storages */
267       if (bget_dirmsg(sd) > 0) {
268          Dmsg1(100, "<stored: %s", sd->msg);
269          /* ****FIXME**** save actual device name */
270          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
271       } else {
272          ok = false;
273       }
274    }
275    if (!ok) {
276       POOL_MEM err_msg;
277       if (sd->msg[0]) {
278          pm_strcpy(err_msg, sd->msg); /* save message */
279          Jmsg(jcr, M_FATAL, 0, _("\n"
280               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
281               device_name.c_str(), err_msg.c_str()/* sd->msg */);
282       } else { 
283          Jmsg(jcr, M_FATAL, 0, _("\n"
284               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
285               device_name.c_str());
286       }
287    }
288    return ok;
289 }
290
291 /*
292  * Start a thread to handle Storage daemon messages and
293  *  Catalog requests.
294  */
295 int start_storage_daemon_message_thread(JCR *jcr)
296 {
297    int status;
298    pthread_t thid;
299
300    jcr->inc_use_count();              /* mark in use by msg thread */
301    jcr->sd_msg_thread_done = false;
302    jcr->SD_msg_chan = 0;
303    Dmsg0(100, "Start SD msg_thread.\n");
304    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
305       berrno be;
306       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
307    }
308    /* Wait for thread to start */
309    while (jcr->SD_msg_chan == 0) {
310       bmicrosleep(0, 50);
311    }
312    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
313    return 1;
314 }
315
316 extern "C" void msg_thread_cleanup(void *arg)
317 {
318    JCR *jcr = (JCR *)arg;
319    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
320    jcr->sd_msg_thread_done = true;
321    jcr->SD_msg_chan = 0;
322    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
323    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
324    free_jcr(jcr);                     /* release jcr */
325 }
326
327 /*
328  * Handle the message channel (i.e. requests from the
329  *  Storage daemon).
330  * Note, we are running in a separate thread.
331  */
332 extern "C" void *msg_thread(void *arg)
333 {
334    JCR *jcr = (JCR *)arg;
335    BSOCK *sd;
336    int JobStatus;
337    char Job[MAX_NAME_LENGTH];
338    uint32_t JobFiles;
339    uint64_t JobBytes;
340    int stat;
341
342    pthread_detach(pthread_self());
343    jcr->SD_msg_chan = pthread_self();
344    pthread_cleanup_push(msg_thread_cleanup, arg);
345    sd = jcr->store_bsock;
346
347    /* Read the Storage daemon's output.
348     */
349    Dmsg0(100, "Start msg_thread loop\n");
350    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
351       Dmsg1(400, "<stored: %s", sd->msg);
352       if (sscanf(sd->msg, Job_start, Job) == 1) {
353          continue;
354       }
355       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
356                  &JobBytes)) == 4) {
357          jcr->SDJobStatus = JobStatus; /* termination status */
358          jcr->SDJobFiles = JobFiles;
359          jcr->SDJobBytes = JobBytes;
360          break;
361       }
362       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
363    }
364    if (is_bnet_error(sd)) {
365       jcr->SDJobStatus = JS_ErrorTerminated;
366    }
367    pthread_cleanup_pop(1);            /* remove and execute the handler */
368    return NULL;
369 }
370
371 void wait_for_storage_daemon_termination(JCR *jcr)
372 {
373    int cancel_count = 0;
374    /* Now wait for Storage daemon to terminate our message thread */
375    while (!jcr->sd_msg_thread_done) {
376       struct timeval tv;
377       struct timezone tz;
378       struct timespec timeout;
379
380       gettimeofday(&tv, &tz);
381       timeout.tv_nsec = 0;
382       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
383       Dmsg0(400, "I'm waiting for message thread termination.\n");
384       P(mutex);
385       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
386       V(mutex);
387       if (job_canceled(jcr)) {
388          if (jcr->SD_msg_chan) {
389             jcr->store_bsock->timed_out = 1;
390             jcr->store_bsock->terminated = 1;
391             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
392             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
393          }
394          cancel_count++;
395       }
396       /* Give SD 30 seconds to clean up after cancel */
397       if (cancel_count == 6) {
398          break;
399       }
400    }
401    set_jcr_job_status(jcr, JS_Terminated);
402 }
403
404 #ifdef needed
405 #define MAX_TRIES 30
406 #define WAIT_TIME 2
407 extern "C" void *device_thread(void *arg)
408 {
409    int i;
410    JCR *jcr;
411    DEVICE *dev;
412
413
414    pthread_detach(pthread_self());
415    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
416    for (i=0; i < MAX_TRIES; i++) {
417       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
418          Dmsg0(900, "Failed connecting to SD.\n");
419          continue;
420       }
421       LockRes();
422       foreach_res(dev, R_DEVICE) {
423          if (!update_device_res(jcr, dev)) {
424             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
425          } else {
426             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
427          }
428       }
429       UnlockRes();
430       bnet_close(jcr->store_bsock);
431       jcr->store_bsock = NULL;
432       break;
433
434    }
435    free_jcr(jcr);
436    return NULL;
437 }
438
439 /*
440  * Start a thread to handle getting Device resource information
441  *  from SD. This is called once at startup of the Director.
442  */
443 void init_device_resources()
444 {
445    int status;
446    pthread_t thid;
447
448    Dmsg0(100, "Start Device thread.\n");
449    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
450       berrno be;
451       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
452    }
453 }
454 #endif