]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/stored/job.c
Unify the reserve_device() for a single device into one subroutine.
[bacula/bacula] / bacula / src / stored / job.c
1 /*
2  *   Job control and execution for Storage Daemon
3  *
4  *   Kern Sibbald, MM
5  *
6  *   Version $Id$
7  *
8  */
9 /*
10    Copyright (C) 2000-2005 Kern Sibbald
11
12    This program is free software; you can redistribute it and/or
13    modify it under the terms of the GNU General Public License
14    version 2 as ammended with additional clauses defined in the
15    file LICENSE in the main source directory.
16
17    This program is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
20    the file LICENSE for additional details.
21
22  */
23
24 #include "bacula.h"
25 #include "stored.h"
26
27 /* Imported variables */
28 extern uint32_t VolSessionTime;
29
30 /* Imported functions */
31 extern uint32_t newVolSessionId();
32
33 /* Forward referenced functions */
34 static bool use_storage_cmd(JCR *jcr);
35
36 /* Requests from the Director daemon */
37 static char jobcmd[] = "JobId=%d job=%127s job_name=%127s client_name=%127s "
38       "type=%d level=%d FileSet=%127s NoAttr=%d SpoolAttr=%d FileSetMD5=%127s "
39       "SpoolData=%d WritePartAfterJob=%d NewVol=%d\n";
40 static char use_storage[]  = "use storage=%127s media_type=%127s "
41    "pool_name=%127s pool_type=%127s append=%d copy=%d stripe=%d\n";
42 static char use_device[]  = "use device=%127s\n";
43 //static char query_device[] = "query device=%127s";
44
45
46 /* Responses sent to Director daemon */
47 static char OKjob[]     = "3000 OK Job SDid=%u SDtime=%u Authorization=%s\n";
48 static char OK_device[] = "3000 OK use device device=%s\n";
49 static char NO_device[] = "3924 Device \"%s\" not in SD Device resources.\n";
50 //static char NOT_open[]  = "3925 Device \"%s\" could not be opened or does not exist.\n";
51 static char BAD_use[]   = "3913 Bad use command: %s\n";
52 static char BAD_job[]   = "3915 Bad Job command: %s\n";
53 //static char OK_query[]  = "3001 OK query\n";
54 //static char NO_query[]  = "3918 Query failed\n";
55 //static char BAD_query[] = "3917 Bad query command: %s\n";
56
57 /*
58  * Director requests us to start a job
59  * Basic tasks done here:
60  *  - We pickup the JobId to be run from the Director.
61  *  - We pickup the device, media, and pool from the Director
62  *  - Wait for a connection from the File Daemon (FD)
63  *  - Accept commands from the FD (i.e. run the job)
64  *  - Return when the connection is terminated or
65  *    there is an error.
66  */
67 bool job_cmd(JCR *jcr)
68 {
69    int JobId;
70    char auth_key[100];
71    BSOCK *dir = jcr->dir_bsock;
72    POOL_MEM job_name, client_name, job, fileset_name, fileset_md5;
73    int JobType, level, spool_attributes, no_attributes, spool_data;
74    int write_part_after_job, NewVol;
75
76    JCR *ojcr;
77
78    /*
79     * Get JobId and permissions from Director
80     */
81    Dmsg1(100, "<dird: %s\n", dir->msg);
82    if (sscanf(dir->msg, jobcmd, &JobId, job.c_str(), job_name.c_str(),
83               client_name.c_str(),
84               &JobType, &level, fileset_name.c_str(), &no_attributes,
85               &spool_attributes, fileset_md5.c_str(), &spool_data, 
86               &write_part_after_job, &NewVol) != 13) {
87       pm_strcpy(jcr->errmsg, dir->msg);
88       bnet_fsend(dir, BAD_job, jcr->errmsg);
89       Dmsg1(100, ">dird: %s\n", dir->msg);
90       Emsg1(M_FATAL, 0, _("Bad Job Command from Director: %s\n"), jcr->errmsg);
91       set_jcr_job_status(jcr, JS_ErrorTerminated);
92       return false;
93    }
94    /*
95     * Since this job could be rescheduled, we
96     *  check to see if we have it already. If so
97     *  free the old jcr and use the new one.
98     */
99    ojcr = get_jcr_by_full_name(job.c_str());
100    if (ojcr && !ojcr->authenticated) {
101       Dmsg2(100, "Found ojcr=0x%x Job %s\n", (unsigned)(long)ojcr, job.c_str());
102       free_jcr(ojcr);
103    }
104    jcr->JobId = JobId;
105    jcr->VolSessionId = newVolSessionId();
106    jcr->VolSessionTime = VolSessionTime;
107    bstrncpy(jcr->Job, job, sizeof(jcr->Job));
108    unbash_spaces(job_name);
109    jcr->job_name = get_pool_memory(PM_NAME);
110    pm_strcpy(jcr->job_name, job_name);
111    unbash_spaces(client_name);
112    jcr->client_name = get_pool_memory(PM_NAME);
113    pm_strcpy(jcr->client_name, client_name);
114    unbash_spaces(fileset_name);
115    jcr->fileset_name = get_pool_memory(PM_NAME);
116    pm_strcpy(jcr->fileset_name, fileset_name);
117    jcr->JobType = JobType;
118    jcr->JobLevel = level;
119    jcr->no_attributes = no_attributes;
120    jcr->spool_attributes = spool_attributes;
121    jcr->spool_data = spool_data;
122    jcr->write_part_after_job = write_part_after_job;
123    jcr->fileset_md5 = get_pool_memory(PM_NAME);
124    pm_strcpy(jcr->fileset_md5, fileset_md5);
125    jcr->NewVolEachJob = NewVol;
126
127    jcr->authenticated = false;
128
129    /*
130     * Pass back an authorization key for the File daemon
131     */
132    make_session_key(auth_key, NULL, 1);
133    bnet_fsend(dir, OKjob, jcr->VolSessionId, jcr->VolSessionTime, auth_key);
134    Dmsg1(100, ">dird: %s", dir->msg);
135    jcr->sd_auth_key = bstrdup(auth_key);
136    memset(auth_key, 0, sizeof(auth_key));
137    generate_daemon_event(jcr, "JobStart");
138    return true;
139 }
140
141 bool use_cmd(JCR *jcr) 
142 {
143    /*
144     * Wait for the device, media, and pool information
145     */
146    if (!use_storage_cmd(jcr)) {
147       set_jcr_job_status(jcr, JS_ErrorTerminated);
148       memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
149       return false;
150    }
151    return true;
152 }
153
154 bool run_cmd(JCR *jcr)
155 {
156    struct timeval tv;
157    struct timezone tz;
158    struct timespec timeout;
159    int errstat;
160
161    Dmsg1(100, "Run_cmd: %s\n", jcr->dir_bsock->msg);
162    /* The following jobs don't need the FD */
163    switch (jcr->JobType) {
164    case JT_MIGRATION:
165    case JT_COPY:
166    case JT_ARCHIVE:
167       jcr->authenticated = true;
168       run_job(jcr);
169       return false;
170    }
171
172    set_jcr_job_status(jcr, JS_WaitFD);          /* wait for FD to connect */
173    dir_send_job_status(jcr);
174
175    gettimeofday(&tv, &tz);
176    timeout.tv_nsec = tv.tv_usec * 1000;
177    timeout.tv_sec = tv.tv_sec + 30 * 60;        /* wait 30 minutes */
178
179    Dmsg1(100, "%s waiting on FD to contact SD\n", jcr->Job);
180    /*
181     * Wait for the File daemon to contact us to start the Job,
182     *  when he does, we will be released, unless the 30 minutes
183     *  expires.
184     */
185    P(jcr->mutex);
186    for ( ;!job_canceled(jcr); ) {
187       errstat = pthread_cond_timedwait(&jcr->job_start_wait, &jcr->mutex, &timeout);
188       if (errstat == 0 || errstat == ETIMEDOUT) {
189          break;
190       }
191    }
192    V(jcr->mutex);
193
194    memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
195
196    if (jcr->authenticated && !job_canceled(jcr)) {
197       Dmsg1(100, "Running job %s\n", jcr->Job);
198       run_job(jcr);                   /* Run the job */
199    }
200    return false;
201 }
202
203 /*
204  * After receiving a connection (in job.c) if it is
205  *   from the File daemon, this routine is called.
206  */
207 void handle_filed_connection(BSOCK *fd, char *job_name)
208 {
209    JCR *jcr;
210
211    bmicrosleep(0, 50000);             /* wait 50 millisecs */
212    if (!(jcr=get_jcr_by_full_name(job_name))) {
213       Jmsg1(NULL, M_FATAL, 0, _("Job name not found: %s\n"), job_name);
214       Dmsg1(100, "Job name not found: %s\n", job_name);
215       return;
216    }
217
218    jcr->file_bsock = fd;
219    jcr->file_bsock->jcr = jcr;
220
221    Dmsg1(110, "Found Job %s\n", job_name);
222
223    if (jcr->authenticated) {
224       Jmsg2(jcr, M_FATAL, 0, "Hey!!!! JobId %u Job %s already authenticated.\n",
225          jcr->JobId, jcr->Job);
226       free_jcr(jcr);
227       return;
228    }
229
230    /*
231     * Authenticate the File daemon
232     */
233    if (jcr->authenticated || !authenticate_filed(jcr)) {
234       Dmsg1(100, "Authentication failed Job %s\n", jcr->Job);
235       Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate File daemon\n"));
236    } else {
237       jcr->authenticated = true;
238       Dmsg1(110, "OK Authentication Job %s\n", jcr->Job);
239    }
240
241    P(jcr->mutex);
242    if (!jcr->authenticated) {
243       set_jcr_job_status(jcr, JS_ErrorTerminated);
244    }
245    pthread_cond_signal(&jcr->job_start_wait); /* wake waiting job */
246    V(jcr->mutex);
247    free_jcr(jcr);
248    return;
249 }
250
251
252 /*
253  *   Use Device command from Director
254  *   He tells is what Device Name to use, the Media Type,
255  *      the Pool Name, and the Pool Type.
256  *
257  *    Ensure that the device exists and is opened, then store
258  *      the media and pool info in the JCR.
259  */
260 class DIRSTORE {
261 public:
262    alist *device;
263    char name[MAX_NAME_LENGTH];
264    char media_type[MAX_NAME_LENGTH];
265    char pool_name[MAX_NAME_LENGTH];
266    char pool_type[MAX_NAME_LENGTH];
267 };
268
269 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append);
270 static int reserve_device(JCR *jcr, DIRSTORE *store, DEVRES *device, char *device_name, int append);
271
272 static bool use_storage_cmd(JCR *jcr)
273 {
274    POOL_MEM store_name, dev_name, media_type, pool_name, pool_type;
275    BSOCK *dir = jcr->dir_bsock;
276    int append;
277    bool ok;       
278    int Copy, Stripe;
279    alist *dirstore;   
280    DIRSTORE *store;
281    char *device_name;
282    DCR *dcr = NULL;
283    /*
284     * If there are multiple devices, the director sends us
285     *   use_device for each device that it wants to use.
286     */
287    Dmsg1(100, "<dird: %s", dir->msg);
288    dirstore = New(alist(10, not_owned_by_alist));
289    do {
290       ok = sscanf(dir->msg, use_storage, store_name.c_str(), 
291                   media_type.c_str(), pool_name.c_str(), 
292                   pool_type.c_str(), &append, &Copy, &Stripe) == 7;
293       if (!ok) {
294          break;
295       }
296       unbash_spaces(store_name);
297       unbash_spaces(media_type);
298       unbash_spaces(pool_name);
299       unbash_spaces(pool_type);
300       store = new DIRSTORE;
301       dirstore->append(store);
302       memset(store, 0, sizeof(DIRSTORE));
303       store->device = New(alist(10));
304       bstrncpy(store->name, store_name, sizeof(store->name));
305       bstrncpy(store->media_type, media_type, sizeof(store->media_type));
306       bstrncpy(store->pool_name, pool_name, sizeof(store->pool_name));
307       bstrncpy(store->pool_type, pool_type, sizeof(store->pool_type));
308
309       /* Now get all devices */
310       while (bnet_recv(dir) >= 0) {
311          ok = sscanf(dir->msg, use_device, dev_name.c_str()) == 1;
312          if (!ok) {
313             break;
314          }
315          unbash_spaces(dev_name);
316          store->device->append(bstrdup(dev_name.c_str()));
317       }
318    }  while (ok && bnet_recv(dir) >= 0);
319
320 #ifdef DEVELOPER
321    /* This loop is debug code and can be removed */
322    /* ***FIXME**** remove after 1.38 release */
323    foreach_alist(store, dirstore) {
324       Dmsg4(100, "Storage=%s media_type=%s pool=%s pool_type=%s\n", 
325          store->name, store->media_type, store->pool_name, 
326          store->pool_type);
327       foreach_alist(device_name, store->device) {
328          Dmsg1(100, "   Device=%s\n", device_name);
329       }
330    }
331 #endif
332
333    /*                    
334     * At this point, we have a list of all the Director's Storage
335     *  resources indicated for this Job, which include Pool, PoolType,
336     *  storage name, and Media type.     
337     * Then for each of the Storage resources, we have a list of
338     *  device names that were given.
339     *
340     * Wiffle through them and find one that can do the backup.
341     */
342    if (ok) {
343       bool first = true;
344       init_jcr_device_wait_timers(jcr);
345       for ( ;; ) {
346          int need_wait = false;
347          foreach_alist(store, dirstore) {
348             foreach_alist(device_name, store->device) {
349                int stat;
350                stat = search_res_for_device(jcr, store, device_name, append);
351                if (stat == 1) {             /* found available device */
352                   dcr = jcr->dcr;
353                   dcr->Copy = Copy;
354                   dcr->Stripe = Stripe;
355                   ok = true;
356                   goto done;
357                } else if (stat == 0) {      /* device busy */
358                   need_wait = true;
359                }
360             }
361          }
362          /*
363           * If there is some device for which we can wait, then
364           *  wait and try again until the wait time expires
365           */
366          if (!need_wait || !wait_for_device(jcr, jcr->errmsg, first)) {
367             break;
368          }
369          first = false;
370       }
371       if (verbose) {
372          unbash_spaces(dir->msg);
373          pm_strcpy(jcr->errmsg, dir->msg);
374          Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
375       }
376       Jmsg(jcr, M_FATAL, 0, _("\n"
377          "     Device \"%s\" with MediaType \"%s\" requested by DIR not found in SD Device resources.\n"),
378            dev_name.c_str(), media_type.c_str());
379       bnet_fsend(dir, NO_device, dev_name.c_str());
380       Dmsg1(100, ">dird: %s\n", dir->msg);
381       ok = false;
382    } else {
383       unbash_spaces(dir->msg);
384       pm_strcpy(jcr->errmsg, dir->msg);
385       if (verbose) {
386          Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
387       }
388       Jmsg(jcr, M_FATAL, 0, _("Bad Use Device command: %s\n"), jcr->errmsg);
389       bnet_fsend(dir, BAD_use, jcr->errmsg);
390       Dmsg1(100, ">dird: %s\n", dir->msg);
391       ok = false;
392    }
393
394 done:
395    foreach_alist(store, dirstore) {
396       delete store->device;
397       delete store;
398    }
399    delete dirstore;
400    if (!ok && dcr) {
401       free_dcr(dcr);
402    }
403    return ok;
404 }
405
406 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append)
407 {
408    DEVRES *device;
409    AUTOCHANGER *changer;
410    BSOCK *dir = jcr->dir_bsock;
411    bool ok;
412    int stat;
413
414    Dmsg1(100, "Search res for %s\n", device_name);
415    foreach_res(device, R_DEVICE) {
416       Dmsg1(100, "Try res=%s\n", device->hdr.name);
417       /* Find resource, and make sure we were able to open it */
418       if (fnmatch(device_name, device->hdr.name, 0) == 0 &&
419           strcmp(device->media_type, store->media_type) == 0) {
420          stat = reserve_device(jcr, store, device, device_name, append);
421          if (stat != 1) {
422             return stat;
423          }
424          Dmsg1(220, "Got: %s", dir->msg);
425          bash_spaces(device_name);
426          ok = bnet_fsend(dir, OK_device, device_name);
427          Dmsg1(100, ">dird: %s\n", dir->msg);
428          return ok ? 1 : -1;
429       }
430    }
431    foreach_res(changer, R_AUTOCHANGER) {
432       Dmsg1(100, "Try changer res=%s\n", changer->hdr.name);
433       /* Find resource, and make sure we were able to open it */
434       if (fnmatch(device_name, changer->hdr.name, 0) == 0) {
435          /* Try each device in this AutoChanger */
436          foreach_alist(device, changer->device) {
437             Dmsg1(100, "Try changer device %s\n", device->hdr.name);
438             stat = reserve_device(jcr, store, device, device_name, append);
439             if (stat == -1) {            /* hard error */
440                return -1;
441             }
442             if (stat == 0) {             /* must wait, try next one */
443                continue;
444             }
445             POOL_MEM dev_name;
446             Dmsg1(100, "Device %s opened.\n", device_name);
447             pm_strcpy(dev_name, device->hdr.name);
448             bash_spaces(dev_name);
449             ok = bnet_fsend(dir, OK_device, dev_name.c_str());  /* Return real device name */
450             Dmsg1(100, ">dird: %s\n", dir->msg);
451             return ok ? 1 : -1;
452          }
453       }
454    }
455    return 0;                    /* nothing found */
456 }
457
458 /*
459  *  Returns: 1 -- OK, have DCR
460  *           0 -- must wait
461  *          -1 -- fatal error
462  */
463 static int reserve_device(JCR *jcr, DIRSTORE *store, DEVRES *device, char *device_name, int append)
464 {
465    bool ok;
466    DCR *dcr;
467    const int name_len = MAX_NAME_LENGTH;
468    if (!device->dev) {
469       device->dev = init_dev(jcr, NULL, device);
470    }
471    if (!device->dev) {
472       if (dev_cap(device->dev, CAP_AUTOCHANGER)) {
473         Jmsg(jcr, M_WARNING, 0, _("\n"
474            "     Device \"%s\" in changer \"%s\" requested by DIR could not be opened or does not exist.\n"),
475              device->hdr.name, device_name);
476       } else {
477          Jmsg(jcr, M_WARNING, 0, _("\n"
478             "     Device \"%s\" requested by DIR could not be opened or does not exist.\n"),
479               device_name);
480       }
481       return 0;
482    }  
483    Dmsg1(100, "Found device %s\n", device->hdr.name);
484    dcr = new_dcr(jcr, device->dev);
485    if (!dcr) {
486       BSOCK *dir = jcr->dir_bsock;
487       bnet_fsend(dir, _("3926 Could not get dcr for device: %s\n"), device_name);
488       Dmsg1(100, ">dird: %s\n", dir->msg);
489       return -1;
490    }
491    jcr->dcr = dcr;
492    bstrncpy(dcr->pool_name, store->pool_name, name_len);
493    bstrncpy(dcr->pool_type, store->pool_type, name_len);
494    bstrncpy(dcr->media_type, store->media_type, name_len);
495    bstrncpy(dcr->dev_name, device_name, name_len);
496    if (append == SD_APPEND) {
497       ok = reserve_device_for_append(dcr);
498    } else {
499       ok = reserve_device_for_read(dcr);
500    }
501    if (!ok) {
502       free_dcr(jcr->dcr);
503       return 0;
504    }
505    return 1;
506 }
507
508
509 #ifdef needed
510 /*
511  *   Query Device command from Director
512  *   Sends Storage Daemon's information on the device to the
513  *    caller (presumably the Director).
514  *   This command always returns "true" so that the line is
515  *    not closed on an error.
516  *
517  */
518 bool query_cmd(JCR *jcr)
519 {
520    POOL_MEM dev_name, VolumeName, MediaType, ChangerName;
521    BSOCK *dir = jcr->dir_bsock;
522    DEVRES *device;
523    AUTOCHANGER *changer;
524    bool ok;
525
526    Dmsg1(100, "Query_cmd: %s", dir->msg);
527    ok = sscanf(dir->msg, query_device, dev_name.c_str()) == 1;
528    Dmsg1(100, "<dird: %s\n", dir->msg);
529    if (ok) {
530       unbash_spaces(dev_name);
531 //    LockRes();
532       foreach_res(device, R_DEVICE) {
533          /* Find resource, and make sure we were able to open it */
534          if (fnmatch(dev_name.c_str(), device->hdr.name, 0) == 0) {
535             if (!device->dev) {
536                device->dev = init_dev(jcr, NULL, device);
537             }
538             if (!device->dev) {
539                break;
540             }  
541 //          UnlockRes();
542             ok = dir_update_device(jcr, device->dev);
543             if (ok) {
544                ok = bnet_fsend(dir, OK_query);
545             } else {
546                bnet_fsend(dir, NO_query);
547             }
548             return ok;
549          }
550       }
551       foreach_res(changer, R_AUTOCHANGER) {
552          /* Find resource, and make sure we were able to open it */
553          if (fnmatch(dev_name.c_str(), changer->hdr.name, 0) == 0) {
554 //          UnlockRes();
555             if (!changer->device || changer->device->size() == 0) {
556                continue;              /* no devices */
557             }
558             ok = dir_update_changer(jcr, changer);
559             if (ok) {
560                ok = bnet_fsend(dir, OK_query);
561             } else {
562                bnet_fsend(dir, NO_query);
563             }
564             return ok;
565          }
566       }
567       /* If we get here, the device/autochanger was not found */
568 //    UnlockRes();
569       unbash_spaces(dir->msg);
570       pm_strcpy(jcr->errmsg, dir->msg);
571       bnet_fsend(dir, NO_device, dev_name.c_str());
572       Dmsg1(100, ">dird: %s\n", dir->msg);
573    } else {
574       unbash_spaces(dir->msg);
575       pm_strcpy(jcr->errmsg, dir->msg);
576       bnet_fsend(dir, BAD_query, jcr->errmsg);
577       Dmsg1(100, ">dird: %s\n", dir->msg);
578    }
579
580    return true;
581 }
582
583 #endif
584
585
586 /*
587  * Destroy the Job Control Record and associated
588  * resources (sockets).
589  */
590 void stored_free_jcr(JCR *jcr)
591 {
592    if (jcr->file_bsock) {
593       bnet_close(jcr->file_bsock);
594       jcr->file_bsock = NULL;
595    }
596    if (jcr->job_name) {
597       free_pool_memory(jcr->job_name);
598    }
599    if (jcr->client_name) {
600       free_memory(jcr->client_name);
601       jcr->client_name = NULL;
602    }
603    if (jcr->fileset_name) {
604       free_memory(jcr->fileset_name);
605    }
606    if (jcr->fileset_md5) {
607       free_memory(jcr->fileset_md5);
608    }
609    if (jcr->bsr) {
610       free_bsr(jcr->bsr);
611       jcr->bsr = NULL;
612    }
613    if (jcr->RestoreBootstrap) {
614       unlink(jcr->RestoreBootstrap);
615       free_pool_memory(jcr->RestoreBootstrap);
616       jcr->RestoreBootstrap = NULL;
617    }
618    if (jcr->next_dev || jcr->prev_dev) {
619       Emsg0(M_FATAL, 0, _("In free_jcr(), but still attached to device!!!!\n"));
620    }
621    pthread_cond_destroy(&jcr->job_start_wait);
622    if (jcr->dcrs) {
623       delete jcr->dcrs;
624    }
625    jcr->dcrs = NULL;
626    if (jcr->dcr) {
627       free_dcr(jcr->dcr);
628       jcr->dcr = NULL;
629    }
630    return;
631 }