]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/dird/msgchan.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / dird / msgchan.c
1 /*
2  *
3  *   Bacula Director -- msgchan.c -- handles the message channel
4  *    to the Storage daemon and the File daemon.
5  *
6  *     Kern Sibbald, August MM
7  *
8  *    This routine runs as a thread and must be thread reentrant.
9  *
10  *  Basic tasks done here:
11  *    Open a message channel with the Storage daemon
12  *      to authenticate ourself and to pass the JobId.
13  *    Create a thread to interact with the Storage daemon
14  *      who returns a job status and requests Catalog services, etc.
15  *
16  *   Version $Id$
17  */
18 /*
19    Copyright (C) 2000-2006 Kern Sibbald
20
21    This program is free software; you can redistribute it and/or
22    modify it under the terms of the GNU General Public License
23    version 2 as amended with additional clauses defined in the
24    file LICENSE in the main source directory.
25
26    This program is distributed in the hope that it will be useful,
27    but WITHOUT ANY WARRANTY; without even the implied warranty of
28    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
29    the file LICENSE for additional details.
30
31  */
32
33 #include "bacula.h"
34 #include "dird.h"
35
36 static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
37
38 /* Commands sent to Storage daemon */
39 static char jobcmd[]     = "JobId=%s job=%s job_name=%s client_name=%s "
40    "type=%d level=%d FileSet=%s NoAttr=%d SpoolAttr=%d FileSetMD5=%s "
41    "SpoolData=%d WritePartAfterJob=%d PreferMountedVols=%d\n";
42 static char use_storage[] = "use storage=%s media_type=%s pool_name=%s "
43    "pool_type=%s append=%d copy=%d stripe=%d\n";
44 static char use_device[] = "use device=%s\n";
45 //static char query_device[] = _("query device=%s");
46
47 /* Response from Storage daemon */
48 static char OKjob[]      = "3000 OK Job SDid=%d SDtime=%d Authorization=%100s\n";
49 static char OK_device[]  = "3000 OK use device device=%s\n";
50
51 /* Storage Daemon requests */
52 static char Job_start[]  = "3010 Job %127s start\n";
53 static char Job_end[]    =
54    "3099 Job %127s end JobStatus=%d JobFiles=%d JobBytes=%" lld "\n";
55
56 /* Forward referenced functions */
57 extern "C" void *msg_thread(void *arg);
58
59 /*
60  * Establish a message channel connection with the Storage daemon
61  * and perform authentication.
62  */
63 bool connect_to_storage_daemon(JCR *jcr, int retry_interval,
64                               int max_retry_time, int verbose)
65 {
66    BSOCK *sd;
67    STORE *store;
68
69    if (jcr->store_bsock) {
70       return true;                    /* already connected */
71    }
72    store = (STORE *)jcr->storage->first();
73
74    /*
75     *  Open message channel with the Storage daemon
76     */
77    Dmsg2(100, "bnet_connect to Storage daemon %s:%d\n", store->address,
78       store->SDport);
79    sd = bnet_connect(jcr, retry_interval, max_retry_time,
80           _("Storage daemon"), store->address,
81           NULL, store->SDport, verbose);
82    if (sd == NULL) {
83       return false;
84    }
85    sd->res = (RES *)store;        /* save pointer to other end */
86    jcr->store_bsock = sd;
87
88    if (!authenticate_storage_daemon(jcr, store)) {
89       bnet_close(sd);
90       jcr->store_bsock = NULL;
91       return false;
92    }
93    return true;
94 }
95
96 /*
97  * Here we ask the SD to send us the info for a 
98  *  particular device resource.
99  */
100 #ifdef needed
101 bool update_device_res(JCR *jcr, DEVICE *dev)
102 {
103    POOL_MEM device_name; 
104    BSOCK *sd;
105    if (!connect_to_storage_daemon(jcr, 5, 30, 0)) {
106       return false;
107    }
108    sd = jcr->store_bsock;
109    pm_strcpy(device_name, dev->hdr.name);
110    bash_spaces(device_name);
111    bnet_fsend(sd, query_device, device_name.c_str());
112    Dmsg1(100, ">stored: %s\n", sd->msg);
113    /* The data is returned through Device_update */
114    if (bget_dirmsg(sd) <= 0) {
115       return false;
116    }
117    return true;
118 }
119 #endif
120
121 /*
122  * Start a job with the Storage daemon
123  */
124 bool start_storage_daemon_job(JCR *jcr, alist *rstore, alist *wstore)
125 {
126    bool ok = true;
127    STORE *storage;
128    BSOCK *sd;
129    char auth_key[100];
130    POOL_MEM store_name, device_name, pool_name, pool_type, media_type;
131    POOL_MEM job_name, client_name, fileset_name;
132    int copy = 0;
133    int stripe = 0;
134    char ed1[30];
135
136    sd = jcr->store_bsock;
137    /*
138     * Now send JobId and permissions, and get back the authorization key.
139     */
140    pm_strcpy(job_name, jcr->job->hdr.name);
141    bash_spaces(job_name);
142    pm_strcpy(client_name, jcr->client->hdr.name);
143    bash_spaces(client_name);
144    pm_strcpy(fileset_name, jcr->fileset->hdr.name);
145    bash_spaces(fileset_name);
146    if (jcr->fileset->MD5[0] == 0) {
147       bstrncpy(jcr->fileset->MD5, "**Dummy**", sizeof(jcr->fileset->MD5));
148    }
149    /* If rescheduling, cancel the previous incarnation of this job
150     *  with the SD, which might be waiting on the FD connection.
151     *  If we do not cancel it the SD will not accept a new connection
152     *  for the same jobid.
153     */
154    if (jcr->reschedule_count) {
155       bnet_fsend(sd, "cancel Job=%s\n", jcr->Job);
156       while (bnet_recv(sd) >= 0)
157          { }
158    } 
159    bnet_fsend(sd, jobcmd, edit_int64(jcr->JobId, ed1), jcr->Job, 
160               job_name.c_str(), client_name.c_str(), 
161               jcr->JobType, jcr->JobLevel,
162               fileset_name.c_str(), !jcr->pool->catalog_files,
163               jcr->job->SpoolAttributes, jcr->fileset->MD5, jcr->spool_data, 
164               jcr->write_part_after_job, jcr->job->PreferMountedVolumes);
165    Dmsg1(100, ">stored: %s\n", sd->msg);
166    if (bget_dirmsg(sd) > 0) {
167        Dmsg1(100, "<stored: %s", sd->msg);
168        if (sscanf(sd->msg, OKjob, &jcr->VolSessionId,
169                   &jcr->VolSessionTime, &auth_key) != 3) {
170           Dmsg1(100, "BadJob=%s\n", sd->msg);
171           Jmsg(jcr, M_FATAL, 0, _("Storage daemon rejected Job command: %s\n"), sd->msg);
172           return 0;
173        } else {
174           jcr->sd_auth_key = bstrdup(auth_key);
175           Dmsg1(150, "sd_auth_key=%s\n", jcr->sd_auth_key);
176        }
177    } else {
178       Jmsg(jcr, M_FATAL, 0, _("<stored: bad response to Job command: %s\n"),
179          bnet_strerror(sd));
180       return 0;
181    }
182
183    pm_strcpy(pool_type, jcr->pool->pool_type);
184    pm_strcpy(pool_name, jcr->pool->hdr.name);
185    bash_spaces(pool_type);
186    bash_spaces(pool_name);
187
188    /*
189     * We have two loops here. The first comes from the 
190     *  Storage = associated with the Job, and we need 
191     *  to attach to each one.
192     * The inner loop loops over all the alternative devices
193     *  associated with each Storage. It selects the first
194     *  available one.
195     *
196     */
197    /* Do read side of storage daemon */
198    if (ok && rstore) {
199       foreach_alist(storage, rstore) {
200          pm_strcpy(store_name, storage->hdr.name);
201          bash_spaces(store_name);
202          pm_strcpy(media_type, storage->media_type);
203          bash_spaces(media_type);
204          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
205                     pool_name.c_str(), pool_type.c_str(), 0, copy, stripe);
206
207          DEVICE *dev;
208          /* Loop over alternative storage Devices until one is OK */
209          foreach_alist(dev, storage->device) {
210             pm_strcpy(device_name, dev->hdr.name);
211             bash_spaces(device_name);
212             bnet_fsend(sd, use_device, device_name.c_str());
213             Dmsg1(100, ">stored: %s", sd->msg);
214          }
215          bnet_sig(sd, BNET_EOD);            /* end of Devices */
216          bnet_sig(sd, BNET_EOD);            /* end of Storages */
217          if (bget_dirmsg(sd) > 0) {
218             Dmsg1(100, "<stored: %s", sd->msg);
219             /* ****FIXME**** save actual device name */
220             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
221          } else {
222             ok = false;
223          }
224          break;
225       }
226    }
227
228    /* Do write side of storage daemon */
229    if (ok && wstore) {
230       foreach_alist(storage, wstore) {
231          pm_strcpy(store_name, storage->hdr.name);
232          bash_spaces(store_name);
233          pm_strcpy(media_type, storage->media_type);
234          bash_spaces(media_type);
235          bnet_fsend(sd, use_storage, store_name.c_str(), media_type.c_str(), 
236                     pool_name.c_str(), pool_type.c_str(), 1, copy, stripe);
237
238          DEVICE *dev;
239          /* Loop over alternative storage Devices until one is OK */
240          foreach_alist(dev, storage->device) {
241             pm_strcpy(device_name, dev->hdr.name);
242             bash_spaces(device_name);
243             bnet_fsend(sd, use_device, device_name.c_str());
244             Dmsg1(100, ">stored: %s", sd->msg);
245          }
246          bnet_sig(sd, BNET_EOD);            /* end of Devices */
247          bnet_sig(sd, BNET_EOD);            /* end of Storages */
248          if (bget_dirmsg(sd) > 0) {
249             Dmsg1(100, "<stored: %s", sd->msg);
250             /* ****FIXME**** save actual device name */
251             ok = sscanf(sd->msg, OK_device, device_name.c_str()) == 1;
252          } else {
253             ok = false;
254          }
255          break;
256       }
257    }
258    if (!ok) {
259       POOL_MEM err_msg;
260       if (sd->msg[0]) {
261          pm_strcpy(err_msg, sd->msg); /* save message */
262          Jmsg(jcr, M_FATAL, 0, _("\n"
263               "     Storage daemon didn't accept Device \"%s\" because:\n     %s"),
264               device_name.c_str(), err_msg.c_str()/* sd->msg */);
265       } else { 
266          Jmsg(jcr, M_FATAL, 0, _("\n"
267               "     Storage daemon didn't accept Device \"%s\" command.\n"), 
268               device_name.c_str());
269       }
270    }
271    return ok;
272 }
273
274 /*
275  * Start a thread to handle Storage daemon messages and
276  *  Catalog requests.
277  */
278 int start_storage_daemon_message_thread(JCR *jcr)
279 {
280    int status;
281    pthread_t thid;
282
283    jcr->inc_use_count();              /* mark in use by msg thread */
284    jcr->sd_msg_thread_done = false;
285    jcr->SD_msg_chan = 0;
286    Dmsg0(100, "Start SD msg_thread.\n");
287    if ((status=pthread_create(&thid, NULL, msg_thread, (void *)jcr)) != 0) {
288       berrno be;
289       Jmsg1(jcr, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
290    }
291    /* Wait for thread to start */
292    while (jcr->SD_msg_chan == 0) {
293       bmicrosleep(0, 50);
294    }
295    Dmsg1(100, "SD msg_thread started. use=%d\n", jcr->use_count());
296    return 1;
297 }
298
299 extern "C" void msg_thread_cleanup(void *arg)
300 {
301    JCR *jcr = (JCR *)arg;
302    db_end_transaction(jcr, jcr->db);       /* terminate any open transaction */
303    jcr->sd_msg_thread_done = true;
304    jcr->SD_msg_chan = 0;
305    pthread_cond_broadcast(&jcr->term_wait); /* wakeup any waiting threads */
306    Dmsg1(100, "=== End msg_thread. use=%d\n", jcr->use_count());
307    free_jcr(jcr);                     /* release jcr */
308 }
309
310 /*
311  * Handle the message channel (i.e. requests from the
312  *  Storage daemon).
313  * Note, we are running in a separate thread.
314  */
315 extern "C" void *msg_thread(void *arg)
316 {
317    JCR *jcr = (JCR *)arg;
318    BSOCK *sd;
319    int JobStatus;
320    char Job[MAX_NAME_LENGTH];
321    uint32_t JobFiles;
322    uint64_t JobBytes;
323    int stat;
324
325    pthread_detach(pthread_self());
326    jcr->SD_msg_chan = pthread_self();
327    pthread_cleanup_push(msg_thread_cleanup, arg);
328    sd = jcr->store_bsock;
329
330    /* Read the Storage daemon's output.
331     */
332    Dmsg0(100, "Start msg_thread loop\n");
333    while (!job_canceled(jcr) && bget_dirmsg(sd) >= 0) {
334       Dmsg1(400, "<stored: %s", sd->msg);
335       if (sscanf(sd->msg, Job_start, Job) == 1) {
336          continue;
337       }
338       if ((stat=sscanf(sd->msg, Job_end, Job, &JobStatus, &JobFiles,
339                  &JobBytes)) == 4) {
340          jcr->SDJobStatus = JobStatus; /* termination status */
341          jcr->SDJobFiles = JobFiles;
342          jcr->SDJobBytes = JobBytes;
343          break;
344       }
345       Dmsg2(400, "end loop stat=%d use=%d\n", stat, jcr->use_count());
346    }
347    if (is_bnet_error(sd)) {
348       jcr->SDJobStatus = JS_ErrorTerminated;
349    }
350    pthread_cleanup_pop(1);            /* remove and execute the handler */
351    return NULL;
352 }
353
354 void wait_for_storage_daemon_termination(JCR *jcr)
355 {
356    int cancel_count = 0;
357    /* Now wait for Storage daemon to terminate our message thread */
358    while (!jcr->sd_msg_thread_done) {
359       struct timeval tv;
360       struct timezone tz;
361       struct timespec timeout;
362
363       gettimeofday(&tv, &tz);
364       timeout.tv_nsec = 0;
365       timeout.tv_sec = tv.tv_sec + 5; /* wait 5 seconds */
366       Dmsg0(400, "I'm waiting for message thread termination.\n");
367       P(mutex);
368       pthread_cond_timedwait(&jcr->term_wait, &mutex, &timeout);
369       V(mutex);
370       if (job_canceled(jcr)) {
371          if (jcr->SD_msg_chan) {
372             jcr->store_bsock->timed_out = 1;
373             jcr->store_bsock->terminated = 1;
374             Dmsg2(400, "kill jobid=%d use=%d\n", (int)jcr->JobId, jcr->use_count());
375             pthread_kill(jcr->SD_msg_chan, TIMEOUT_SIGNAL);
376          }
377          cancel_count++;
378       }
379       /* Give SD 30 seconds to clean up after cancel */
380       if (cancel_count == 6) {
381          break;
382       }
383    }
384    set_jcr_job_status(jcr, JS_Terminated);
385 }
386
387 #ifdef needed
388 #define MAX_TRIES 30
389 #define WAIT_TIME 2
390 extern "C" void *device_thread(void *arg)
391 {
392    int i;
393    JCR *jcr;
394    DEVICE *dev;
395
396
397    pthread_detach(pthread_self());
398    jcr = new_control_jcr("*DeviceInit*", JT_SYSTEM);
399    for (i=0; i < MAX_TRIES; i++) {
400       if (!connect_to_storage_daemon(jcr, 10, 30, 1)) {
401          Dmsg0(900, "Failed connecting to SD.\n");
402          continue;
403       }
404       LockRes();
405       foreach_res(dev, R_DEVICE) {
406          if (!update_device_res(jcr, dev)) {
407             Dmsg1(900, "Error updating device=%s\n", dev->hdr.name);
408          } else {
409             Dmsg1(900, "Updated Device=%s\n", dev->hdr.name);
410          }
411       }
412       UnlockRes();
413       bnet_close(jcr->store_bsock);
414       jcr->store_bsock = NULL;
415       break;
416
417    }
418    free_jcr(jcr);
419    return NULL;
420 }
421
422 /*
423  * Start a thread to handle getting Device resource information
424  *  from SD. This is called once at startup of the Director.
425  */
426 void init_device_resources()
427 {
428    int status;
429    pthread_t thid;
430
431    Dmsg0(100, "Start Device thread.\n");
432    if ((status=pthread_create(&thid, NULL, device_thread, NULL)) != 0) {
433       berrno be;
434       Jmsg1(NULL, M_ABORT, 0, _("Cannot create message thread: %s\n"), be.strerror(status));
435    }
436 }
437 #endif