]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
Modify most of dird/msgchan.c to use bsock class calls rather than
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2    Bacula® - The Network Backup Solution
3
4    Copyright (C) 2000-2007 Free Software Foundation Europe e.V.
5
6    The main author of Bacula is Kern Sibbald, with contributions from
7    many others, a complete list can be found in the file AUTHORS.
8    This program is Free Software; you can redistribute it and/or
9    modify it under the terms of version two of the GNU General Public
10    License as published by the Free Software Foundation and included
11    in the file LICENSE.
12
13    This program is distributed in the hope that it will be useful, but
14    WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16    General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
21    02110-1301, USA.
22
23    Bacula® is a registered trademark of John Walker.
24    The licensor of Bacula is the Free Software Foundation Europe
25    (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26    Switzerland, email:ftf@fsfeurope.org.
27 */
28 /*
29  *
30  *   Bacula Director -- msgchan.c -- handles the message channel
31  *    to the Storage daemon and the File daemon.
32  *
33  *     Kern Sibbald, August MM
34  *
35  *    This routine runs as a thread and must be thread reentrant.
36  *
37  *  Basic tasks done here:
38  *    Open a message channel with the Storage daemon
39  *      to authenticate ourself and to pass the JobId.
40  *    Create a thread to interact with the Storage daemon
41  *      who returns a job status and requests Catalog services, etc.
42  *
43  *   Version $Id$
44  */
45
46 #include "bacula.h"
47 #include "dird.h"
48
49 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
50
51 /* Commands sent to Storage daemon */
52 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
53    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
54    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
55 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
56    "pool_type=%s append=%d copy=%d stripe=%d\n";
57 static char use_device[] = "use device=%s\n";
58 //static char query_device[] = _("query device=%s");
59
60 /* Response from Storage daemon */
61 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
62 static char OK_device[]  = "3000 OK use device device=%s\n";
63
64 /* Storage Daemon requests */
65 static char Job_start[]  = "3010 Job %127s start\n";
66 static char Job_end[]    =
67    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
68
69 /* Forward referenced functions */
70 extern "C" void *msg_thread(void *arg);
71
72 /*
73  * Establish a message channel connection with the Storage daemon
74  * and perform authentication.
75  */
76 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
77                               int max_retry_time, int verbose)
78 {
79    BSOCK *sd;
80    STORE *store;
81    utime_t heart_beat;    
82
83    if (jcr->store_bsock) {
84       return true;                    /* already connected */
85    }
86
87    /* If there is a write storage use it */
88    if (jcr->wstore) {
89       store = jcr->wstore;
90    } else {
91       store = jcr->rstore;
92    }
93
94    if (store->heartbeat_interval) {
95       heart_beat = store->heartbeat_interval;
96    } else {           
97       heart_beat = director->heartbeat_interval;
98    }
99
100    /*
101     *  Open message channel with the Storage daemon
102     */
103    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
104       store->SDport);
105    sd = bnet_connect(jcr, retry_interval, max_retry_time, heart_beat,
106           _("Storage daemon"), store->address,
107           NULL, store->SDport, verbose);
108    if (sd == NULL) {
109       return false;
110    }
111    sd->res = (RES *)store;        /* save pointer to other end */
112    jcr->store_bsock = sd;
113
114    if (!authenticate_storage_daemon(jcr, store)) {
115       sd->close();
116       jcr->store_bsock = NULL;
117       return false;
118    }
119    return true;
120 }
121
122 /*
123  * Here we ask the SD to send us the info for a 
124  *  particular device resource.
125  */
126 #ifdef needed
127 bool update_device_res(JCR *jcr, DEVICE *dev)
128 {
129    POOL_MEM device_name; 
130    BSOCK *sd;
131    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
132       return false;
133    }
134    sd = jcr->store_bsock;
135    pm_strcpy(device_name, dev->name());
136    bash_spaces(device_name);
137    sd->fsend(query_device, device_name.c_str());
138    Dmsg1(100, ">stored: %s\n", sd->msg);
139    /* The data is returned through Device_update */
140    if (bget_dirmsg(sd) <= 0) {
141       return false;
142    }
143    return true;
144 }
145 #endif
146
147 /*
148  * Start a job with the Storage daemon
149  */
150 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
151 {
152    bool ok = true;
153    STORE *storage;
154    BSOCK *sd;
155    char auth_key[100];
156    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
157    POOL_MEM job_name, client_name, fileset_name;
158    int copy = 0;
159    int stripe = 0;
160    char ed1[30];
161
162    sd = jcr->store_bsock;
163    /*
164     * Now send JobId and permissions, and get back the authorization key.
165     */
166    pm_strcpy(job_name, jcr->job->name());
167    bash_spaces(job_name);
168    pm_strcpy(client_name, jcr->client->name());
169    bash_spaces(client_name);
170    pm_strcpy(fileset_name, jcr->fileset->name());
171    bash_spaces(fileset_name);
172    if (jcr->fileset->MD5[0] == 0) {
173       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
174    }
175    /* If rescheduling, cancel the previous incarnation of this job
176     *  with the SD, which might be waiting on the FD connection.
177     *  If we do not cancel it the SD will not accept a new connection
178     *  for the same jobid.
179     */
180    if (jcr->reschedule_count) {
181       sd->fsend("cancel Job=%s\n", jcr->Job);
182       while (sd->recv() >= 0)
183          { }
184    } 
185    sd->fsend(jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
186              job_name.c_str(), client_name.c_str(), 
187              jcr->JobType, jcr->JobLevel,
188              fileset_name.c_str(), !jcr->pool->catalog_files,
189              jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
190              jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
191    Dmsg1(100, ">stored: %s\n", sd->msg);
192    if (bget_dirmsg(sd) > 0) {
193        Dmsg1(100, "<stored: %s", sd->msg);
194        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
195                   &jcr->VolSessionTime, &auth_key) != 3) {
196           Dmsg1(100, "BadJob=%s\n", sd->msg);
197           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
198           return 0;
199        } else {
200           jcr->sd_auth_key = bstrdup(auth_key);
201           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
202        }
203    } else {
204       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
205          sd->bstrerror());
206       return 0;
207    }
208
209    /*
210     * We have two loops here. The first comes from the 
211     *  Storage = associated with the Job, and we need 
212     *  to attach to each one.
213     * The inner loop loops over all the alternative devices
214     *  associated with each Storage. It selects the first
215     *  available one.
216     *
217     */
218    /* Do read side of storage daemon */
219    if (ok && rstore) {
220       /* For the moment, only migrate has rpool */
221       if (jcr->JobType == JT_MIGRATE) {
222          pm_strcpy(pool_type, jcr->rpool->pool_type);
223          pm_strcpy(pool_name, jcr->rpool->name());
224       } else {
225          pm_strcpy(pool_type, jcr->pool->pool_type);
226          pm_strcpy(pool_name, jcr->pool->name());
227       }
228       bash_spaces(pool_type);
229       bash_spaces(pool_name);
230       foreach_alist(storage, rstore) {
231          Dmsg1(100, "Rstore=%s\n", storage->name());
232          bash_spaces(store_name);
233          pm_strcpy(media_type, storage->media_type);
234          bash_spaces(media_type);
235          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
236                    pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
237          Dmsg1(100, "rstore >stored: %s", sd->msg);
238          DEVICE *dev;
239          /* Loop over alternative storage Devices until one is OK */
240          foreach_alist(dev, storage->device) {
241             pm_strcpy(device_name, dev->name());
242             bash_spaces(device_name);
243             sd->fsend(use_device, device_name.c_str());
244             Dmsg1(100, ">stored: %s", sd->msg);
245          }
246          sd->signal(BNET_EOD);           /* end of Devices */
247       }
248       sd->signal(BNET_EOD);              /* end of Storages */
249       if (bget_dirmsg(sd) > 0) {
250          Dmsg1(100, "<stored: %s", sd->msg);
251          /* ****FIXME**** save actual device name */
252          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
253       } else {
254          ok = false;
255       }
256    }
257
258    /* Do write side of storage daemon */
259    if (ok && wstore) {
260       pm_strcpy(pool_type, jcr->pool->pool_type);
261       pm_strcpy(pool_name, jcr->pool->name());
262       bash_spaces(pool_type);
263       bash_spaces(pool_name);
264       foreach_alist(storage, wstore) {
265          pm_strcpy(store_name, storage->name());
266          bash_spaces(store_name);
267          pm_strcpy(media_type, storage->media_type);
268          bash_spaces(media_type);
269          sd->fsend(use_storage, store_name.c_str(), media_type.c_str(), 
270                    pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
271
272          Dmsg1(100, "wstore >stored: %s", sd->msg);
273          DEVICE *dev;
274          /* Loop over alternative storage Devices until one is OK */
275          foreach_alist(dev, storage->device) {
276             pm_strcpy(device_name, dev->name());
277             bash_spaces(device_name);
278             sd->fsend(use_device, device_name.c_str());
279             Dmsg1(100, ">stored: %s", sd->msg);
280          }
281          sd->signal(BNET_EOD);           /* end of Devices */
282       }
283       sd->signal(BNET_EOD);              /* end of Storages */
284       if (bget_dirmsg(sd) > 0) {
285          Dmsg1(100, "<stored: %s", sd->msg);
286          /* ****FIXME**** save actual device name */
287          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
288       } else {
289          ok = false;
290       }
291    }
292    if (!ok) {
293       POOL_MEM err_msg;
294       if (sd->msg[0]) {
295          pm_strcpy(err_msg, sd->msg); /* save message */
296          Jmsg(jcr, M_FATAL, 0, _("\n"
297               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
298               device_name.c_str(), err_msg.c_str()/* sd->msg */);
299       } else { 
300          Jmsg(jcr, M_FATAL, 0, _("\n"
301               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
302               device_name.c_str());
303       }
304    } else {
305       Jmsg(jcr, M_INFO, 0, _("Using Device \"%s\"\n"), device_name.c_str());
306    }
307    return ok;
308 }
309
310 /*
311  * Start a thread to handle Storage daemon messages and
312  *  Catalog requests.
313  */
314 bool start_storage_daemon_message_thread(JCR *jcr)
315 {
316    int status;
317    pthread_t thid;
318
319    jcr->inc_use_count();              /* mark in use by msg thread */
320    jcr->sd_msg_thread_done = false;
321    jcr->SD_msg_chan = 0;
322    Dmsg0(100, "Start SD msg_thread.\n");
323    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
324       berrno be;
325       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
326    }
327    /* Wait for thread to start */
328    while (jcr->SD_msg_chan == 0) {
329       bmicrosleep(0, 50);
330       if (job_canceled(jcr) || jcr->sd_msg_thread_done) {
331          return false;
332       }
333    }
334    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
335    return true;
336 }
337
338 extern "C" void msg_thread_cleanup(void *arg)
339 {
340    JCR *jcr = (JCR *)arg;
341    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
342    jcr->sd_msg_thread_done = true;
343    jcr->SD_msg_chan = 0;
344    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
345    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
346    free_jcr(jcr);                     /* release jcr */
347    db_thread_cleanup();               /* remove thread specific data */
348 }
349
350 /*
351  * Handle the message channel (i.e. requests from the
352  *  Storage daemon).
353  * Note, we are running in a separate thread.
354  */
355 extern "C" void *msg_thread(void *arg)
356 {
357    JCR *jcr = (JCR *)arg;
358    BSOCK *sd;
359    int JobStatus;
360    char Job[MAX_NAME_LENGTH];
361    uint32_t JobFiles;
362    uint64_t JobBytes;
363    int stat;
364
365    pthread_detach(pthread_self());
366    jcr->SD_msg_chan = pthread_self();
367    pthread_cleanup_push(msg_thread_cleanup, arg);
368    sd = jcr->store_bsock;
369
370    /* Read the Storage daemon's output.
371     */
372    Dmsg0(100, "Start msg_thread loop\n");
373    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
374       Dmsg1(400, "<stored: %s", sd->msg);
375       if (sscanf(sd->msg, Job_start, Job) == 1) {
376          continue;
377       }
378       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
379                  &JobBytes)) == 4) {
380          jcr->SDJobStatus = JobStatus; /* termination status */
381          jcr->SDJobFiles = JobFiles;
382          jcr->SDJobBytes = JobBytes;
383          break;
384       }
385       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
386    }
387    if (is_bnet_error(sd)) {
388       jcr->SDJobStatus = JS_ErrorTerminated;
389    }
390    pthread_cleanup_pop(1);            /* remove and execute the handler */
391    return NULL;
392 }
393
394 void wait_for_storage_daemon_termination(JCR *jcr)
395 {
396    int cancel_count = 0;
397    /* Now wait for Storage daemon to terminate our message thread */
398    while (!jcr->sd_msg_thread_done) {
399       struct timeval tv;
400       struct timezone tz;
401       struct timespec timeout;
402
403       gettimeofday(&tv, &tz);
404       timeout.tv_nsec = 0;
405       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
406       Dmsg0(400, "I'm waiting for message thread termination.\n");
407       P(mutex);
408       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
409       V(mutex);
410       if (job_canceled(jcr)) {
411          if (jcr->SD_msg_chan) {
412             jcr->store_bsock->set_timed_out();
413             jcr->store_bsock->set_terminated();
414             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
415             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
416          }
417          cancel_count++;
418       }
419       /* Give SD 30 seconds to clean up after cancel */
420       if (cancel_count == 6) {
421          break;
422       }
423    }
424    set_jcr_job_status(jcr, JS_Terminated);
425 }
426
427 #ifdef needed
428 #define MAX_TRIES 30
429 #define WAIT_TIME 2
430 extern "C" void *device_thread(void *arg)
431 {
432    int i;
433    JCR *jcr;
434    DEVICE *dev;
435
436
437    pthread_detach(pthread_self());
438    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
439    for (i=0; i < MAX_TRIES; i++) {
440       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
441          Dmsg0(900, "Failed connecting to SD.\n");
442          continue;
443       }
444       LockRes();
445       foreach_res(dev, R_DEVICE) {
446          if (!update_device_res(jcr, dev)) {
447             Dmsg1(900, "Error updating device=%s\n", dev->name());
448          } else {
449             Dmsg1(900, "Updated Device=%s\n", dev->name());
450          }
451       }
452       UnlockRes();
453       bnet_close(jcr->store_bsock);
454       jcr->store_bsock = NULL;
455       break;
456
457    }
458    free_jcr(jcr);
459    return NULL;
460 }
461
462 /*
463  * Start a thread to handle getting Device resource information
464  *  from SD. This is called once at startup of the Director.
465  */
466 void init_device_resources()
467 {
468    int status;
469    pthread_t thid;
470
471    Dmsg0(100, "Start Device thread.\n");
472    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
473       berrno be;
474       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.bstrerror(status));
475    }
476 }
477 #endif