]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
17Mar06
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37
38 /* Commands sent to Storage daemon */
39 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
40    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43    "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
46
47 /* Response from Storage daemon */
48 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[]  = "3000 OK use device device=%s\n";
50
51 /* Storage Daemon requests */
52 static char Job_start[]  = "3010 Job %127s start\n";
53 static char Job_end[]    =
54    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%lld\n";
55
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
58
59 /*
60  * Establish a message channel connection with the Storage daemon
61  * and perform authentication.
62  */
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64                               int max_retry_time, int verbose)
65 {
66    BSOCK *sd;
67    STORE *store;
68
69    if (jcr->store_bsock) {
70       return true;                    /* already connected */
71    }
72    store = (STORE *)jcr->storage->first();
73
74    /*
75     *  Open message channel with the Storage daemon
76     */
77    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
78       store->SDport);
79    sd = bnet_connect(jcr, retry_interval, max_retry_time,
80           _("Storage daemon"), store->address,
81           NULL, store->SDport, verbose);
82    if (sd == NULL) {
83       return false;
84    }
85    sd->res = (RES *)store;        /* save pointer to other end */
86    jcr->store_bsock = sd;
87
88    if (!authenticate_storage_daemon(jcr, store)) {
89       bnet_close(sd);
90       jcr->store_bsock = NULL;
91       return false;
92    }
93    return true;
94 }
95
96 /*
97  * Here we ask the SD to send us the info for a 
98  *  particular device resource.
99  */
100 #ifdef needed
101 bool update_device_res(JCR *jcr, DEVICE *dev)
102 {
103    POOL_MEM device_name; 
104    BSOCK *sd;
105    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
106       return false;
107    }
108    sd = jcr->store_bsock;
109    pm_strcpy(device_name, dev->hdr.name);
110    bash_spaces(device_name);
111    bnet_fsend(sd, query_device, device_name.c_str());
112    Dmsg1(100, ">stored: %s\n", sd->msg);
113    /* The data is returned through Device_update */
114    if (bget_dirmsg(sd) <= 0) {
115       return false;
116    }
117    return true;
118 }
119 #endif
120
121 /*
122  * Start a job with the Storage daemon
123  */
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
125 {
126    bool ok = true;
127    STORE *storage;
128    BSOCK *sd;
129    char auth_key[100];
130    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
131    int copy = 0;
132    int stripe = 0;
133
134    sd = jcr->store_bsock;
135    /*
136     * Now send JobId and permissions, and get back the authorization key.
137     */
138    bash_spaces(jcr->job->hdr.name);
139    bash_spaces(jcr->client->hdr.name);
140    bash_spaces(jcr->fileset->hdr.name);
141    if (jcr->fileset->MD5[0] == 0) {
142       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
143    }
144    /* If rescheduling, cancel the previous incarnation of this job
145     *  with the SD, which might be waiting on the FD connection.
146     *  If we do not cancel it the SD will not accept a new connection
147     *  for the same jobid.
148     */
149    if (jcr->reschedule_count) {
150       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
151       while (bnet_recv(sd) >= 0)
152          { }
153    } 
154    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
155               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
156               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
157               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
158               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
159    Dmsg1(100, ">stored: %s\n", sd->msg);
160    unbash_spaces(jcr->job->hdr.name);
161    unbash_spaces(jcr->client->hdr.name);
162    unbash_spaces(jcr->fileset->hdr.name);
163    if (bget_dirmsg(sd) > 0) {
164        Dmsg1(100, "<stored: %s", sd->msg);
165        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
166                   &jcr->VolSessionTime, &auth_key) != 3) {
167           Dmsg1(100, "BadJob=%s\n", sd->msg);
168           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
169           return 0;
170        } else {
171           jcr->sd_auth_key = bstrdup(auth_key);
172           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
173        }
174    } else {
175       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
176          bnet_strerror(sd));
177       return 0;
178    }
179
180    pm_strcpy(pool_type, jcr->pool->pool_type);
181    pm_strcpy(pool_name, jcr->pool->hdr.name);
182    bash_spaces(pool_type);
183    bash_spaces(pool_name);
184
185    /*
186     * We have two loops here. The first comes from the 
187     *  Storage = associated with the Job, and we need 
188     *  to attach to each one.
189     * The inner loop loops over all the alternative devices
190     *  associated with each Storage. It selects the first
191     *  available one.
192     *
193     */
194    /* Do read side of storage daemon */
195    if (ok && rstore) {
196       foreach_alist(storage, rstore) {
197          pm_strcpy(store_name, storage->hdr.name);
198          bash_spaces(store_name);
199          pm_strcpy(media_type, storage->media_type);
200          bash_spaces(media_type);
201          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
202                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
203
204          DEVICE *dev;
205          /* Loop over alternative storage Devices until one is OK */
206          foreach_alist(dev, storage->device) {
207             pm_strcpy(device_name, dev->hdr.name);
208             bash_spaces(device_name);
209             bnet_fsend(sd, use_device, device_name.c_str());
210             Dmsg1(100, ">stored: %s", sd->msg);
211          }
212          bnet_sig(sd, BNET_EOD);            /* end of Devices */
213       }
214       bnet_sig(sd, BNET_EOD);            /* end of Storages */
215       if (bget_dirmsg(sd) > 0) {
216          Dmsg1(100, "<stored: %s", sd->msg);
217          /* ****FIXME**** save actual device name */
218          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
219       } else {
220          POOL_MEM err_msg;
221          pm_strcpy(err_msg, sd->msg); /* save message */
222          Jmsg(jcr, M_FATAL, 0, _("\n"
223             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
224             device_name.c_str(), err_msg.c_str()/* sd->msg */);
225          ok = false;
226       }
227    }
228
229    /* Do write side of storage daemon */
230    if (ok && wstore) {
231       foreach_alist(storage, wstore) {
232          pm_strcpy(store_name, storage->hdr.name);
233          bash_spaces(store_name);
234          pm_strcpy(media_type, storage->media_type);
235          bash_spaces(media_type);
236          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
237                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
238
239          DEVICE *dev;
240          /* Loop over alternative storage Devices until one is OK */
241          foreach_alist(dev, storage->device) {
242             pm_strcpy(device_name, dev->hdr.name);
243             bash_spaces(device_name);
244             bnet_fsend(sd, use_device, device_name.c_str());
245             Dmsg1(100, ">stored: %s", sd->msg);
246          }
247          bnet_sig(sd, BNET_EOD);            /* end of Devices */
248       }
249       bnet_sig(sd, BNET_EOD);            /* end of Storages */
250       if (bget_dirmsg(sd) > 0) {
251          Dmsg1(100, "<stored: %s", sd->msg);
252          /* ****FIXME**** save actual device name */
253          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
254       } else {
255          POOL_MEM err_msg;
256          pm_strcpy(err_msg, sd->msg); /* save message */
257          Jmsg(jcr, M_FATAL, 0, _("\n"
258             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
259             device_name.c_str(), err_msg.c_str()/* sd->msg */);
260          ok = false;
261       }
262    }
263    return ok;
264 }
265
266 /*
267  * Start a thread to handle Storage daemon messages and
268  *  Catalog requests.
269  */
270 int start_storage_daemon_message_thread(JCR *jcr)
271 {
272    int status;
273    pthread_t thid;
274
275    jcr->inc_use_count();              /* mark in use by msg thread */
276    jcr->sd_msg_thread_done = false;
277    jcr->SD_msg_chan = 0;
278    Dmsg0(100, "Start SD msg_thread.\n");
279    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
280       berrno be;
281       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
282    }
283    /* Wait for thread to start */
284    while (jcr->SD_msg_chan == 0) {
285       bmicrosleep(0, 50);
286    }
287    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
288    return 1;
289 }
290
291 extern "C" void msg_thread_cleanup(void *arg)
292 {
293    JCR *jcr = (JCR *)arg;
294    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
295    jcr->sd_msg_thread_done = true;
296    jcr->SD_msg_chan = 0;
297    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
298    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
299    free_jcr(jcr);                     /* release jcr */
300 }
301
302 /*
303  * Handle the message channel (i.e. requests from the
304  *  Storage daemon).
305  * Note, we are running in a separate thread.
306  */
307 extern "C" void *msg_thread(void *arg)
308 {
309    JCR *jcr = (JCR *)arg;
310    BSOCK *sd;
311    int JobStatus;
312    char Job[MAX_NAME_LENGTH];
313    uint32_t JobFiles;
314    uint64_t JobBytes;
315    int stat;
316
317    pthread_detach(pthread_self());
318    jcr->SD_msg_chan = pthread_self();
319    pthread_cleanup_push(msg_thread_cleanup, arg);
320    sd = jcr->store_bsock;
321
322    /* Read the Storage daemon's output.
323     */
324    Dmsg0(100, "Start msg_thread loop\n");
325    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
326       Dmsg1(400, "<stored: %s", sd->msg);
327       if (sscanf(sd->msg, Job_start, Job) == 1) {
328          continue;
329       }
330       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
331                  &JobBytes)) == 4) {
332          jcr->SDJobStatus = JobStatus; /* termination status */
333          jcr->SDJobFiles = JobFiles;
334          jcr->SDJobBytes = JobBytes;
335          break;
336       }
337       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
338    }
339    if (is_bnet_error(sd)) {
340       jcr->SDJobStatus = JS_ErrorTerminated;
341    }
342    pthread_cleanup_pop(1);            /* remove and execute the handler */
343    return NULL;
344 }
345
346 void wait_for_storage_daemon_termination(JCR *jcr)
347 {
348    int cancel_count = 0;
349    /* Now wait for Storage daemon to terminate our message thread */
350    while (!jcr->sd_msg_thread_done) {
351       struct timeval tv;
352       struct timezone tz;
353       struct timespec timeout;
354
355       gettimeofday(&tv, &tz);
356       timeout.tv_nsec = 0;
357       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
358       Dmsg0(400, "I'm waiting for message thread termination.\n");
359       P(mutex);
360       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
361       V(mutex);
362       if (job_canceled(jcr)) {
363          if (jcr->SD_msg_chan) {
364             jcr->store_bsock->timed_out = 1;
365             jcr->store_bsock->terminated = 1;
366             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
367             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
368          }
369          cancel_count++;
370       }
371       /* Give SD 30 seconds to clean up after cancel */
372       if (cancel_count == 6) {
373          break;
374       }
375    }
376    set_jcr_job_status(jcr, JS_Terminated);
377 }
378
379 #ifdef needed
380 #define MAX_TRIES 30
381 #define WAIT_TIME 2
382 extern "C" void *device_thread(void *arg)
383 {
384    int i;
385    JCR *jcr;
386    DEVICE *dev;
387
388
389    pthread_detach(pthread_self());
390    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
391    for (i=0; i < MAX_TRIES; i++) {
392       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
393          Dmsg0(900, "Failed connecting to SD.\n");
394          continue;
395       }
396       LockRes();
397       foreach_res(dev, R_DEVICE) {
398          if (!update_device_res(jcr, dev)) {
399             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
400          } else {
401             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
402          }
403       }
404       UnlockRes();
405       bnet_close(jcr->store_bsock);
406       jcr->store_bsock = NULL;
407       break;
408
409    }
410    free_jcr(jcr);
411    return NULL;
412 }
413
414 /*
415  * Start a thread to handle getting Device resource information
416  *  from SD. This is called once at startup of the Director.
417  */
418 void init_device_resources()
419 {
420    int status;
421    pthread_t thid;
422
423    Dmsg0(100, "Start Device thread.\n");
424    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
425       berrno be;
426       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
427    }
428 }
429 #endif