]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
9da9150771622c76305603fcbdca15b57bf555cd
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2005 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 /* Commands sent to Storage daemon */
37 static char jobcmd[]     = "JobId=%d job=%s job_name=%s client_name=%s "
38    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
39    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
40 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
41    "pool_type=%s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%s\n";
43 //static char query_device[] = _("query device=%s");
44
45 /* Response from Storage daemon */
46 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
47 static char OK_device[]  = "3000 OK use device device=%s\n";
48
49 /* Storage Daemon requests */
50 static char Job_start[]  = "3010 Job %127s start\n";
51 static char Job_end[]    =
52    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
53
54 /* Forward referenced functions */
55 extern "C" void *msg_thread(void *arg);
56
57 /*
58  * Establish a message channel connection with the Storage daemon
59  * and perform authentication.
60  */
61 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
62                               int max_retry_time, int verbose)
63 {
64    BSOCK *sd;
65    STORE *store;
66
67    if (jcr->store_bsock) {
68       return true;                    /* already connected */
69    }
70    store = (STORE *)jcr->storage->first();
71
72    /*
73     *  Open message channel with the Storage daemon
74     */
75    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
76       store->SDport);
77    sd = bnet_connect(jcr, retry_interval, max_retry_time,
78           _("Storage daemon"), store->address,
79           NULL, store->SDport, verbose);
80    if (sd == NULL) {
81       return false;
82    }
83    sd->res = (RES *)store;        /* save pointer to other end */
84    jcr->store_bsock = sd;
85
86    if (!authenticate_storage_daemon(jcr, store)) {
87       bnet_close(sd);
88       jcr->store_bsock = NULL;
89       return false;
90    }
91    return true;
92 }
93
94 /*
95  * Here we ask the SD to send us the info for a 
96  *  particular device resource.
97  */
98 #ifdef needed
99 bool update_device_res(JCR *jcr, DEVICE *dev)
100 {
101    POOL_MEM device_name; 
102    BSOCK *sd;
103    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
104       return false;
105    }
106    sd = jcr->store_bsock;
107    pm_strcpy(device_name, dev->hdr.name);
108    bash_spaces(device_name);
109    bnet_fsend(sd, query_device, device_name.c_str());
110    Dmsg1(100, ">stored: %s\n", sd->msg);
111    /* The data is returned through Device_update */
112    if (bget_dirmsg(sd) <= 0) {
113       return false;
114    }
115    return true;
116 }
117 #endif
118
119 /*
120  * Start a job with the Storage daemon
121  */
122 int start_storage_daemon_job(JCR *jcr, alist *store, int append)
123 {
124    bool ok = false;
125    STORE *storage;
126    BSOCK *sd;
127    char auth_key[100];
128    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
129    char PoolId[50];
130    int copy = 0;
131    int stripe = 0;
132
133    sd = jcr->store_bsock;
134    /*
135     * Now send JobId and permissions, and get back the authorization key.
136     */
137    bash_spaces(jcr->job->hdr.name);
138    bash_spaces(jcr->client->hdr.name);
139    bash_spaces(jcr->fileset->hdr.name);
140    if (jcr->fileset->MD5[0] == 0) {
141       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
142    }
143    bnet_fsend(sd, jobcmd, jcr->JobId, jcr->Job, jcr->job->hdr.name,
144               jcr->client->hdr.name, jcr->JobType, jcr->JobLevel,
145               jcr->fileset->hdr.name, !jcr->pool->catalog_files,
146               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
147               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
148    Dmsg1(100, ">stored: %s\n", sd->msg);
149    unbash_spaces(jcr->job->hdr.name);
150    unbash_spaces(jcr->client->hdr.name);
151    unbash_spaces(jcr->fileset->hdr.name);
152    if (bget_dirmsg(sd) > 0) {
153        Dmsg1(100, "<stored: %s", sd->msg);
154        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
155                   &jcr->VolSessionTime, &auth_key) != 3) {
156           Dmsg1(100, "BadJob=%s\n", sd->msg);
157           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
158           return 0;
159        } else {
160           jcr->sd_auth_key = bstrdup(auth_key);
161           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
162        }
163    } else {
164       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
165          bnet_strerror(sd));
166       return 0;
167    }
168
169    pm_strcpy(pool_type, jcr->pool->pool_type);
170    pm_strcpy(pool_name, jcr->pool->hdr.name);
171    bash_spaces(pool_type);
172    bash_spaces(pool_name);
173    edit_int64(jcr->PoolId, PoolId);
174
175    /*
176     * We have two loops here. The first comes from the 
177     *  Storage = associated with the Job, and we need 
178     *  to attach to each one.
179     * The inner loop loops over all the alternative devices
180     *  associated with each Storage. It selects the first
181     *  available one.
182     *
183     * Note, the outer loop is not yet implemented.
184     */
185    foreach_alist(storage, store) {
186 //    storage = (STORE *)store->first();
187       pm_strcpy(store_name, storage->hdr.name);
188       bash_spaces(store_name);
189       pm_strcpy(media_type, storage->media_type);
190       bash_spaces(media_type);
191       bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
192                  pool_name.c_str(), pool_type.c_str(), append, copy, stripe);
193
194       DEVICE *dev;
195       /* Loop over alternative storage Devices until one is OK */
196       foreach_alist(dev, storage->device) {
197          pm_strcpy(device_name, dev->hdr.name);
198          bash_spaces(device_name);
199          bnet_fsend(sd, use_device, device_name.c_str());
200          Dmsg1(100, ">stored: %s", sd->msg);
201       }
202       bnet_sig(sd, BNET_EOD);            /* end of Devices */
203       bnet_sig(sd, BNET_EOD);            /* end of Storages */
204       if (bget_dirmsg(sd) > 0) {
205          Dmsg1(100, "<stored: %s", sd->msg);
206          /* ****FIXME**** save actual device name */
207          ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
208       } else {
209          POOL_MEM err_msg;
210          pm_strcpy(err_msg, sd->msg); /* save message */
211          Jmsg(jcr, M_FATAL, 0, _("\n"
212             "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
213             device_name.c_str(), err_msg.c_str()/* sd->msg */);
214       }
215       break;
216    }
217    if (ok) {
218       ok = bnet_fsend(sd, "run");
219       Dmsg1(100, ">stored: %s\n", sd->msg);
220    }
221    return ok;
222 }
223
224 /*
225  * Start a thread to handle Storage daemon messages and
226  *  Catalog requests.
227  */
228 int start_storage_daemon_message_thread(JCR *jcr)
229 {
230    int status;
231    pthread_t thid;
232
233    P(jcr->mutex);
234    jcr->use_count++;                  /* mark in use by msg thread */
235    jcr->sd_msg_thread_done = false;
236    jcr->SD_msg_chan = 0;
237    V(jcr->mutex);
238    Dmsg0(100, "Start SD msg_thread.\n");
239    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
240       berrno be;
241       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
242    }
243    Dmsg0(100, "SD msg_thread started.\n");
244    /* Wait for thread to start */
245    while (jcr->SD_msg_chan == 0) {
246       bmicrosleep(0, 50);
247    }
248    return 1;
249 }
250
251 extern "C" void msg_thread_cleanup(void *arg)
252 {
253    JCR *jcr = (JCR *)arg;
254    Dmsg0(200, "End msg_thread\n");
255    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
256    P(jcr->mutex);
257    jcr->sd_msg_thread_done = true;
258    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
259    jcr->SD_msg_chan = 0;
260    V(jcr->mutex);
261    free_jcr(jcr);                     /* release jcr */
262 }
263
264 /*
265  * Handle the message channel (i.e. requests from the
266  *  Storage daemon).
267  * Note, we are running in a separate thread.
268  */
269 extern "C" void *msg_thread(void *arg)
270 {
271    JCR *jcr = (JCR *)arg;
272    BSOCK *sd;
273    int JobStatus;
274    char Job[MAX_NAME_LENGTH];
275    uint32_t JobFiles;
276    uint64_t JobBytes;
277    int stat;
278
279    pthread_detach(pthread_self());
280    jcr->SD_msg_chan = pthread_self();
281    pthread_cleanup_push(msg_thread_cleanup, arg);
282    sd = jcr->store_bsock;
283
284    /* Read the Storage daemon's output.
285     */
286    Dmsg0(100, "Start msg_thread loop\n");
287    while ((stat=bget_dirmsg(sd)) >= 0) {
288       Dmsg1(200, "<stored: %s", sd->msg);
289       if (sscanf(sd->msg, Job_start, &Job) == 1) {
290          continue;
291       }
292       if (sscanf(sd->msg, Job_end, &Job, &JobStatus, &JobFiles,
293                  &JobBytes) == 4) {
294          jcr->SDJobStatus = JobStatus; /* termination status */
295          jcr->SDJobFiles = JobFiles;
296          jcr->SDJobBytes = JobBytes;
297          break;
298       }
299    }
300    if (is_bnet_error(sd)) {
301       jcr->SDJobStatus = JS_ErrorTerminated;
302    }
303    pthread_cleanup_pop(1);            /* remove and execute the handler */
304    return NULL;
305 }
306
307 void wait_for_storage_daemon_termination(JCR *jcr)
308 {
309    int cancel_count = 0;
310    /* Now wait for Storage daemon to terminate our message thread */
311    set_jcr_job_status(jcr, JS_WaitSD);
312    P(jcr->mutex);
313    while (!jcr->sd_msg_thread_done) {
314       struct timeval tv;
315       struct timezone tz;
316       struct timespec timeout;
317
318       gettimeofday(&tv, &tz);
319       timeout.tv_nsec = 0;
320       timeout.tv_sec = tv.tv_sec + 10; /* wait 10 seconds */
321       Dmsg0(300, "I'm waiting for message thread termination.\n");
322       pthread_cond_timedwait(&jcr->term_wait, &jcr->mutex, &timeout);
323       if (job_canceled(jcr)) {
324          cancel_count++;
325       }
326       /* Give SD 30 seconds to clean up after cancel */
327       if (cancel_count == 3) {
328          break;
329       }
330    }
331    V(jcr->mutex);
332    set_jcr_job_status(jcr, JS_Terminated);
333 }
334
335 #ifdef needed
336 #define MAX_TRIES 30
337 #define WAIT_TIME 2
338 extern "C" void *device_thread(void *arg)
339 {
340    int i;
341    JCR *jcr;
342    DEVICE *dev;
343
344
345    pthread_detach(pthread_self());
346    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
347    for (i=0; i < MAX_TRIES; i++) {
348       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
349          Dmsg0(000, "Failed connecting to SD.\n");
350          continue;
351       }
352       LockRes();
353       foreach_res(dev, R_DEVICE) {
354          if (!update_device_res(jcr, dev)) {
355             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
356          } else {
357             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
358          }
359       }
360       UnlockRes();
361       bnet_close(jcr->store_bsock);
362       jcr->store_bsock = NULL;
363       break;
364
365    }
366    free_jcr(jcr);
367    return NULL;
368 }
369
370 /*
371  * Start a thread to handle getting Device resource information
372  *  from SD. This is called once at startup of the Director.
373  */
374 void init_device_resources()
375 {
376    int status;
377    pthread_t thid;
378
379    Dmsg0(100, "Start Device thread.\n");
380    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
381       berrno be;
382       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
383    }
384 }
385 #endif