]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
03Dec05
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2005 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 /* Commands sent to Storage daemon */
37 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
38    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41    "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
44
45 /* Response from Storage daemon */
46 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[]  = "3000 OK use device device=%s\n";
48
49 /* Storage Daemon requests */
50 static char Job_start[]  = "3010 Job %127s start\n";
51 static char Job_end[]    =
52    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
53
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
56
57 /*
58  * Establish a message channel connection with the Storage daemon
59  * and perform authentication.
60  */
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62                               int max_retry_time, int verbose)
63 {
64    BSOCK *sd;
65    STORE *store;
66
67    if (jcr->store_bsock) {
68       return true;                    /* already connected */
69    }
70    store = (STORE *)jcr->storage->first();
71
72    /*
73     *  Open message channel with the Storage daemon
74     */
75    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
76       store->SDport);
77    sd = bnet_connect(jcr, retry_interval, max_retry_time,
78           _("Storage daemon"), store->address,
79           NULL, store->SDport, verbose);
80    if (sd == NULL) {
81       return false;
82    }
83    sd->res = (RES *)store;        /* save pointer to other end */
84    jcr->store_bsock = sd;
85
86    if (!authenticate_storage_daemon(jcr, store)) {
87       bnet_close(sd);
88       jcr->store_bsock = NULL;
89       return false;
90    }
91    return true;
92 }
93
94 /*
95  * Here we ask the SD to send us the info for a 
96  *  particular device resource.
97  */
98 #ifdef needed
99 bool update_device_res(JCR *jcr, DEVICE *dev)
100 {
101    POOL_MEM device_name; 
102    BSOCK *sd;
103    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
104       return false;
105    }
106    sd = jcr->store_bsock;
107    pm_strcpy(device_name, dev->hdr.name);
108    bash_spaces(device_name);
109    bnet_fsend(sd, query_device, device_name.c_str());
110    Dmsg1(100, ">stored: %s\n", sd->msg);
111    /* The data is returned through Device_update */
112    if (bget_dirmsg(sd) <= 0) {
113       return false;
114    }
115    return true;
116 }
117 #endif
118
119 /*
120  * Start a job with the Storage daemon
121  */
122 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
123 {
124    bool ok = true;
125    STORE *storage;
126    BSOCK *sd;
127    char auth_key[100];
128    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
129    int copy = 0;
130    int stripe = 0;
131
132    sd = jcr->store_bsock;
133    /*
134     * Now send JobId and permissions, and get back the authorization key.
135     */
136    bash_spaces(jcr->job->hdr.name);
137    bash_spaces(jcr->client->hdr.name);
138    bash_spaces(jcr->fileset->hdr.name);
139    if (jcr->fileset->MD5[0] == 0) {
140       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
141    }
142    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
143               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
144               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
145               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
146               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
147    Dmsg1(100, ">stored: %s\n", sd->msg);
148    unbash_spaces(jcr->job->hdr.name);
149    unbash_spaces(jcr->client->hdr.name);
150    unbash_spaces(jcr->fileset->hdr.name);
151    if (bget_dirmsg(sd) > 0) {
152        Dmsg1(100, "<stored: %s", sd->msg);
153        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
154                   &jcr->VolSessionTime, &auth_key) != 3) {
155           Dmsg1(100, "BadJob=%s\n", sd->msg);
156           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
157           return 0;
158        } else {
159           jcr->sd_auth_key = bstrdup(auth_key);
160           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
161        }
162    } else {
163       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
164          bnet_strerror(sd));
165       return 0;
166    }
167
168    pm_strcpy(pool_type, jcr->pool->pool_type);
169    pm_strcpy(pool_name, jcr->pool->hdr.name);
170    bash_spaces(pool_type);
171    bash_spaces(pool_name);
172
173    /*
174     * We have two loops here. The first comes from the 
175     *  Storage = associated with the Job, and we need 
176     *  to attach to each one.
177     * The inner loop loops over all the alternative devices
178     *  associated with each Storage. It selects the first
179     *  available one.
180     *
181     */
182    /* Do read side of storage daemon */
183    if (ok && rstore) {
184       foreach_alist(storage, rstore) {
185          pm_strcpy(store_name, storage->hdr.name);
186          bash_spaces(store_name);
187          pm_strcpy(media_type, storage->media_type);
188          bash_spaces(media_type);
189          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
190                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
191
192          DEVICE *dev;
193          /* Loop over alternative storage Devices until one is OK */
194          foreach_alist(dev, storage->device) {
195             pm_strcpy(device_name, dev->hdr.name);
196             bash_spaces(device_name);
197             bnet_fsend(sd, use_device, device_name.c_str());
198             Dmsg1(100, ">stored: %s", sd->msg);
199          }
200          bnet_sig(sd, BNET_EOD);            /* end of Devices */
201          bnet_sig(sd, BNET_EOD);            /* end of Storages */
202          if (bget_dirmsg(sd) > 0) {
203             Dmsg1(100, "<stored: %s", sd->msg);
204             /* ****FIXME**** save actual device name */
205             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
206          } else {
207             POOL_MEM err_msg;
208             pm_strcpy(err_msg, sd->msg); /* save message */
209             Jmsg(jcr, M_FATAL, 0, _("\n"
210                "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
211                device_name.c_str(), err_msg.c_str()/* sd->msg */);
212             ok = false;
213          }
214          break;
215       }
216    }
217
218    /* Do write side of storage daemon */
219    if (ok && wstore) {
220       foreach_alist(storage, wstore) {
221          pm_strcpy(store_name, storage->hdr.name);
222          bash_spaces(store_name);
223          pm_strcpy(media_type, storage->media_type);
224          bash_spaces(media_type);
225          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
226                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
227
228          DEVICE *dev;
229          /* Loop over alternative storage Devices until one is OK */
230          foreach_alist(dev, storage->device) {
231             pm_strcpy(device_name, dev->hdr.name);
232             bash_spaces(device_name);
233             bnet_fsend(sd, use_device, device_name.c_str());
234             Dmsg1(100, ">stored: %s", sd->msg);
235          }
236          bnet_sig(sd, BNET_EOD);            /* end of Devices */
237          bnet_sig(sd, BNET_EOD);            /* end of Storages */
238          if (bget_dirmsg(sd) > 0) {
239             Dmsg1(100, "<stored: %s", sd->msg);
240             /* ****FIXME**** save actual device name */
241             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
242          } else {
243             POOL_MEM err_msg;
244             pm_strcpy(err_msg, sd->msg); /* save message */
245             Jmsg(jcr, M_FATAL, 0, _("\n"
246                "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
247                device_name.c_str(), err_msg.c_str()/* sd->msg */);
248             ok = false;
249          }
250          break;
251       }
252    }
253    if (ok) {
254       ok = bnet_fsend(sd, "run");
255       Dmsg1(100, ">stored: %s\n", sd->msg);
256    }
257    return ok;
258 }
259
260 /*
261  * Start a thread to handle Storage daemon messages and
262  *  Catalog requests.
263  */
264 int start_storage_daemon_message_thread(JCR *jcr)
265 {
266    int status;
267    pthread_t thid;
268
269    P(jcr->mutex);
270    jcr->use_count++;                  /* mark in use by msg thread */
271    jcr->sd_msg_thread_done = false;
272    jcr->SD_msg_chan = 0;
273    V(jcr->mutex);
274    Dmsg0(100, "Start SD msg_thread.\n");
275    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
276       berrno be;
277       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
278    }
279    Dmsg0(100, "SD msg_thread started.\n");
280    /* Wait for thread to start */
281    while (jcr->SD_msg_chan == 0) {
282       bmicrosleep(0, 50);
283    }
284    return 1;
285 }
286
287 extern "C" void msg_thread_cleanup(void *arg)
288 {
289    JCR *jcr = (JCR *)arg;
290    Dmsg0(200, "End msg_thread\n");
291    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
292    P(jcr->mutex);
293    jcr->sd_msg_thread_done = true;
294    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
295    jcr->SD_msg_chan = 0;
296    V(jcr->mutex);
297    free_jcr(jcr);                     /* release jcr */
298 }
299
300 /*
301  * Handle the message channel (i.e. requests from the
302  *  Storage daemon).
303  * Note, we are running in a separate thread.
304  */
305 extern "C" void *msg_thread(void *arg)
306 {
307    JCR *jcr = (JCR *)arg;
308    BSOCK *sd;
309    int JobStatus;
310    char Job[MAX_NAME_LENGTH];
311    uint32_t JobFiles;
312    uint64_t JobBytes;
313    int stat;
314
315    pthread_detach(pthread_self());
316    jcr->SD_msg_chan = pthread_self();
317    pthread_cleanup_push(msg_thread_cleanup, arg);
318    sd = jcr->store_bsock;
319
320    /* Read the Storage daemon's output.
321     */
322    Dmsg0(100, "Start msg_thread loop\n");
323    while ((stat=bget_dirmsg(sd)) >= 0) {
324       Dmsg1(200, "<stored: %s", sd->msg);
325       if (sscanf(sd->msg, Job_start, &Job) == 1) {
326          continue;
327       }
328       if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
329                  &JobBytes) == 4) {
330          jcr->SDJobStatus = JobStatus; /* termination status */
331          jcr->SDJobFiles = JobFiles;
332          jcr->SDJobBytes = JobBytes;
333          break;
334       }
335    }
336    if (is_bnet_error(sd)) {
337       jcr->SDJobStatus = JS_ErrorTerminated;
338    }
339    pthread_cleanup_pop(1);            /* remove and execute the handler */
340    return NULL;
341 }
342
343 void wait_for_storage_daemon_termination(JCR *jcr)
344 {
345    int cancel_count = 0;
346    /* Now wait for Storage daemon to terminate our message thread */
347    set_jcr_job_status(jcr, JS_WaitSD);
348    P(jcr->mutex);
349    while (!jcr->sd_msg_thread_done) {
350       struct timeval tv;
351       struct timezone tz;
352       struct timespec timeout;
353
354       gettimeofday(&tv, &tz);
355       timeout.tv_nsec = 0;
356       timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
357       Dmsg0(300, "I'm waiting for message thread termination.\n");
358       pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
359       if (job_canceled(jcr)) {
360          cancel_count++;
361       }
362       /* Give SD 30 seconds to clean up after cancel */
363       if (cancel_count == 3) {
364          break;
365       }
366    }
367    V(jcr->mutex);
368    set_jcr_job_status(jcr, JS_Terminated);
369 }
370
371 #ifdef needed
372 #define MAX_TRIES 30
373 #define WAIT_TIME 2
374 extern "C" void *device_thread(void *arg)
375 {
376    int i;
377    JCR *jcr;
378    DEVICE *dev;
379
380
381    pthread_detach(pthread_self());
382    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
383    for (i=0; i < MAX_TRIES; i++) {
384       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
385          Dmsg0(000, "Failed connecting to SD.\n");
386          continue;
387       }
388       LockRes();
389       foreach_res(dev, R_DEVICE) {
390          if (!update_device_res(jcr, dev)) {
391             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
392          } else {
393             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
394          }
395       }
396       UnlockRes();
397       bnet_close(jcr->store_bsock);
398       jcr->store_bsock = NULL;
399       break;
400
401    }
402    free_jcr(jcr);
403    return NULL;
404 }
405
406 /*
407  * Start a thread to handle getting Device resource information
408  *  from SD. This is called once at startup of the Director.
409  */
410 void init_device_resources()
411 {
412    int status;
413    pthread_t thid;
414
415    Dmsg0(100, "Start Device thread.\n");
416    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
417       berrno be;
418       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
419    }
420 }
421 #endif