]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
kes More copyright changes.
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Bacula® - The Network Backup Solution
20
21    Copyright (C) 2000-2006 Free Software Foundation Europe e.V.
22
23    The main author of Bacula is Kern Sibbald, with contributions from
24    many others, a complete list can be found in the file AUTHORS.
25    This program is Free Software; you can redistribute it and/or
26    modify it under the terms of version two of the GNU General Public
27    License as published by the Free Software Foundation plus additions
28    that are listed in the file LICENSE.
29
30    This program is distributed in the hope that it will be useful, but
31    WITHOUT ANY WARRANTY; without even the implied warranty of
32    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33    General Public License for more details.
34
35    You should have received a copy of the GNU General Public License
36    along with this program; if not, write to the Free Software
37    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
38    02110-1301, USA.
39
40    Bacula® is a registered trademark of John Walker.
41    The licensor of Bacula is the Free Software Foundation Europe
42    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
43    Switzerland, email:ftf@fsfeurope.org.
44 */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81
82    if (jcr->store_bsock) {
83       return true;                    /* already connected */
84    }
85
86    /* If there is a write storage use it */
87    if (jcr->wstore) {
88       store = jcr->wstore;
89    } else {
90       store = jcr->rstore;
91    }
92
93    /*
94     *  Open message channel with the Storage daemon
95     */
96    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
97       store->SDport);
98    sd = bnet_connect(jcr, retry_interval, max_retry_time,
99           _("Storage daemon"), store->address,
100           NULL, store->SDport, verbose);
101    if (sd == NULL) {
102       return false;
103    }
104    sd->res = (RES *)store;        /* save pointer to other end */
105    jcr->store_bsock = sd;
106
107    if (!authenticate_storage_daemon(jcr, store)) {
108       bnet_close(sd);
109       jcr->store_bsock = NULL;
110       return false;
111    }
112    return true;
113 }
114
115 /*
116  * Here we ask the SD to send us the info for a 
117  *  particular device resource.
118  */
119 #ifdef needed
120 bool update_device_res(JCR *jcr, DEVICE *dev)
121 {
122    POOL_MEM device_name; 
123    BSOCK *sd;
124    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
125       return false;
126    }
127    sd = jcr->store_bsock;
128    pm_strcpy(device_name, dev->hdr.name);
129    bash_spaces(device_name);
130    bnet_fsend(sd, query_device, device_name.c_str());
131    Dmsg1(100, ">stored: %s\n", sd->msg);
132    /* The data is returned through Device_update */
133    if (bget_dirmsg(sd) <= 0) {
134       return false;
135    }
136    return true;
137 }
138 #endif
139
140 /*
141  * Start a job with the Storage daemon
142  */
143 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
144 {
145    bool ok = true;
146    STORE *storage;
147    BSOCK *sd;
148    char auth_key[100];
149    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
150    POOL_MEM job_name, client_name, fileset_name;
151    int copy = 0;
152    int stripe = 0;
153    char ed1[30];
154
155    sd = jcr->store_bsock;
156    /*
157     * Now send JobId and permissions, and get back the authorization key.
158     */
159    pm_strcpy(job_name, jcr->job->hdr.name);
160    bash_spaces(job_name);
161    pm_strcpy(client_name, jcr->client->hdr.name);
162    bash_spaces(client_name);
163    pm_strcpy(fileset_name, jcr->fileset->hdr.name);
164    bash_spaces(fileset_name);
165    if (jcr->fileset->MD5[0] == 0) {
166       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
167    }
168    /* If rescheduling, cancel the previous incarnation of this job
169     *  with the SD, which might be waiting on the FD connection.
170     *  If we do not cancel it the SD will not accept a new connection
171     *  for the same jobid.
172     */
173    if (jcr->reschedule_count) {
174       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
175       while (bnet_recv(sd) >= 0)
176          { }
177    } 
178    bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
179               job_name.c_str(), client_name.c_str(), 
180               jcr->JobType, jcr->JobLevel,
181               fileset_name.c_str(), !jcr->pool->catalog_files,
182               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
183               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
184    Dmsg1(100, ">stored: %s\n", sd->msg);
185    if (bget_dirmsg(sd) > 0) {
186        Dmsg1(100, "<stored: %s", sd->msg);
187        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
188                   &jcr->VolSessionTime, &auth_key) != 3) {
189           Dmsg1(100, "BadJob=%s\n", sd->msg);
190           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
191           return 0;
192        } else {
193           jcr->sd_auth_key = bstrdup(auth_key);
194           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
195        }
196    } else {
197       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
198          bnet_strerror(sd));
199       return 0;
200    }
201
202    pm_strcpy(pool_type, jcr->pool->pool_type);
203    pm_strcpy(pool_name, jcr->pool->hdr.name);
204    bash_spaces(pool_type);
205    bash_spaces(pool_name);
206
207    /*
208     * We have two loops here. The first comes from the 
209     *  Storage = associated with the Job, and we need 
210     *  to attach to each one.
211     * The inner loop loops over all the alternative devices
212     *  associated with each Storage. It selects the first
213     *  available one.
214     *
215     */
216    /* Do read side of storage daemon */
217    if (ok && rstore) {
218       foreach_alist(storage, rstore) {
219          Dmsg1(100, "Rstore=%s\n", storage->name());
220          pm_strcpy(store_name, storage->name());
221          bash_spaces(store_name);
222          pm_strcpy(media_type, storage->media_type);
223          bash_spaces(media_type);
224          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
225                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
226
227          DEVICE *dev;
228          /* Loop over alternative storage Devices until one is OK */
229          foreach_alist(dev, storage->device) {
230             pm_strcpy(device_name, dev->hdr.name);
231             bash_spaces(device_name);
232             bnet_fsend(sd, use_device, device_name.c_str());
233             Dmsg1(100, ">stored: %s", sd->msg);
234          }
235          bnet_sig(sd, BNET_EOD);            /* end of Devices */
236       }
237       bnet_sig(sd, BNET_EOD);            /* end of Storages */
238       if (bget_dirmsg(sd) > 0) {
239          Dmsg1(100, "<stored: %s", sd->msg);
240          /* ****FIXME**** save actual device name */
241          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
242       } else {
243          ok = false;
244       }
245    }
246
247    /* Do write side of storage daemon */
248    if (ok && wstore) {
249       foreach_alist(storage, wstore) {
250          Dmsg1(100, "Wstore=%s\n", storage->name());
251          pm_strcpy(store_name, storage->name());
252          bash_spaces(store_name);
253          pm_strcpy(media_type, storage->media_type);
254          bash_spaces(media_type);
255          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
256                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
257
258          DEVICE *dev;
259          /* Loop over alternative storage Devices until one is OK */
260          foreach_alist(dev, storage->device) {
261             pm_strcpy(device_name, dev->hdr.name);
262             bash_spaces(device_name);
263             bnet_fsend(sd, use_device, device_name.c_str());
264             Dmsg1(100, ">stored: %s", sd->msg);
265          }
266          bnet_sig(sd, BNET_EOD);            /* end of Devices */
267       }
268       bnet_sig(sd, BNET_EOD);            /* end of Storages */
269       if (bget_dirmsg(sd) > 0) {
270          Dmsg1(100, "<stored: %s", sd->msg);
271          /* ****FIXME**** save actual device name */
272          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
273       } else {
274          ok = false;
275       }
276    }
277    if (!ok) {
278       POOL_MEM err_msg;
279       if (sd->msg[0]) {
280          pm_strcpy(err_msg, sd->msg); /* save message */
281          Jmsg(jcr, M_FATAL, 0, _("\n"
282               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
283               device_name.c_str(), err_msg.c_str()/* sd->msg */);
284       } else { 
285          Jmsg(jcr, M_FATAL, 0, _("\n"
286               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
287               device_name.c_str());
288       }
289    }
290    return ok;
291 }
292
293 /*
294  * Start a thread to handle Storage daemon messages and
295  *  Catalog requests.
296  */
297 int start_storage_daemon_message_thread(JCR *jcr)
298 {
299    int status;
300    pthread_t thid;
301
302    jcr->inc_use_count();              /* mark in use by msg thread */
303    jcr->sd_msg_thread_done = false;
304    jcr->SD_msg_chan = 0;
305    Dmsg0(100, "Start SD msg_thread.\n");
306    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
307       berrno be;
308       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
309    }
310    /* Wait for thread to start */
311    while (jcr->SD_msg_chan == 0) {
312       bmicrosleep(0, 50);
313    }
314    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
315    return 1;
316 }
317
318 extern "C" void msg_thread_cleanup(void *arg)
319 {
320    JCR *jcr = (JCR *)arg;
321    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
322    jcr->sd_msg_thread_done = true;
323    jcr->SD_msg_chan = 0;
324    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
325    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
326    free_jcr(jcr);                     /* release jcr */
327 }
328
329 /*
330  * Handle the message channel (i.e. requests from the
331  *  Storage daemon).
332  * Note, we are running in a separate thread.
333  */
334 extern "C" void *msg_thread(void *arg)
335 {
336    JCR *jcr = (JCR *)arg;
337    BSOCK *sd;
338    int JobStatus;
339    char Job[MAX_NAME_LENGTH];
340    uint32_t JobFiles;
341    uint64_t JobBytes;
342    int stat;
343
344    pthread_detach(pthread_self());
345    jcr->SD_msg_chan = pthread_self();
346    pthread_cleanup_push(msg_thread_cleanup, arg);
347    sd = jcr->store_bsock;
348
349    /* Read the Storage daemon's output.
350     */
351    Dmsg0(100, "Start msg_thread loop\n");
352    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
353       Dmsg1(400, "<stored: %s", sd->msg);
354       if (sscanf(sd->msg, Job_start, Job) == 1) {
355          continue;
356       }
357       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
358                  &JobBytes)) == 4) {
359          jcr->SDJobStatus = JobStatus; /* termination status */
360          jcr->SDJobFiles = JobFiles;
361          jcr->SDJobBytes = JobBytes;
362          break;
363       }
364       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
365    }
366    if (is_bnet_error(sd)) {
367       jcr->SDJobStatus = JS_ErrorTerminated;
368    }
369    pthread_cleanup_pop(1);            /* remove and execute the handler */
370    return NULL;
371 }
372
373 void wait_for_storage_daemon_termination(JCR *jcr)
374 {
375    int cancel_count = 0;
376    /* Now wait for Storage daemon to terminate our message thread */
377    while (!jcr->sd_msg_thread_done) {
378       struct timeval tv;
379       struct timezone tz;
380       struct timespec timeout;
381
382       gettimeofday(&tv, &tz);
383       timeout.tv_nsec = 0;
384       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
385       Dmsg0(400, "I'm waiting for message thread termination.\n");
386       P(mutex);
387       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
388       V(mutex);
389       if (job_canceled(jcr)) {
390          if (jcr->SD_msg_chan) {
391             jcr->store_bsock->timed_out = 1;
392             jcr->store_bsock->terminated = 1;
393             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
394             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
395          }
396          cancel_count++;
397       }
398       /* Give SD 30 seconds to clean up after cancel */
399       if (cancel_count == 6) {
400          break;
401       }
402    }
403    set_jcr_job_status(jcr, JS_Terminated);
404 }
405
406 #ifdef needed
407 #define MAX_TRIES 30
408 #define WAIT_TIME 2
409 extern "C" void *device_thread(void *arg)
410 {
411    int i;
412    JCR *jcr;
413    DEVICE *dev;
414
415
416    pthread_detach(pthread_self());
417    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
418    for (i=0; i < MAX_TRIES; i++) {
419       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
420          Dmsg0(900, "Failed connecting to SD.\n");
421          continue;
422       }
423       LockRes();
424       foreach_res(dev, R_DEVICE) {
425          if (!update_device_res(jcr, dev)) {
426             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
427          } else {
428             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
429          }
430       }
431       UnlockRes();
432       bnet_close(jcr->store_bsock);
433       jcr->store_bsock = NULL;
434       break;
435
436    }
437    free_jcr(jcr);
438    return NULL;
439 }
440
441 /*
442  * Start a thread to handle getting Device resource information
443  *  from SD. This is called once at startup of the Director.
444  */
445 void init_device_resources()
446 {
447    int status;
448    pthread_t thid;
449
450    Dmsg0(100, "Start Device thread.\n");
451    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
452       berrno be;
453       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
454    }
455 }
456 #endif