2 * Job control and execution for Storage Daemon
10 Copyright (C) 2000-2005 Kern Sibbald
12 This program is free software; you can redistribute it and/or
13 modify it under the terms of the GNU General Public License
14 version 2 as ammended with additional clauses defined in the
15 file LICENSE in the main source directory.
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 the file LICENSE for additional details.
27 /* Imported variables */
28 extern uint32_t VolSessionTime;
30 /* Imported functions */
31 extern uint32_t newVolSessionId();
33 /* Forward referenced functions */
34 static bool use_storage_cmd(JCR *jcr);
36 /* Requests from the Director daemon */
37 static char jobcmd[] = "JobId=%d job=%127s job_name=%127s client_name=%127s "
38 "type=%d level=%d FileSet=%127s NoAttr=%d SpoolAttr=%d FileSetMD5=%127s "
39 "SpoolData=%d WritePartAfterJob=%d NewVol=%d\n";
40 static char use_storage[] = "use storage=%127s media_type=%127s "
41 "pool_name=%127s pool_type=%127s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%127s\n";
43 //static char query_device[] = "query device=%127s";
46 /* Responses sent to Director daemon */
47 static char OKjob[] = "3000 OK Job SDid=%u SDtime=%u Authorization=%s\n";
48 static char OK_device[] = "3000 OK use device device=%s\n";
49 static char NO_device[] = "3924 Device \"%s\" not in SD Device resources.\n";
50 //static char NOT_open[] = "3925 Device \"%s\" could not be opened or does not exist.\n";
51 static char BAD_use[] = "3913 Bad use command: %s\n";
52 static char BAD_job[] = "3915 Bad Job command: %s\n";
53 //static char OK_query[] = "3001 OK query\n";
54 //static char NO_query[] = "3918 Query failed\n";
55 //static char BAD_query[] = "3917 Bad query command: %s\n";
58 * Director requests us to start a job
59 * Basic tasks done here:
60 * - We pickup the JobId to be run from the Director.
61 * - We pickup the device, media, and pool from the Director
62 * - Wait for a connection from the File Daemon (FD)
63 * - Accept commands from the FD (i.e. run the job)
64 * - Return when the connection is terminated or
67 bool job_cmd(JCR *jcr)
71 BSOCK *dir = jcr->dir_bsock;
72 POOL_MEM job_name, client_name, job, fileset_name, fileset_md5;
73 int JobType, level, spool_attributes, no_attributes, spool_data;
74 int write_part_after_job, NewVol;
79 * Get JobId and permissions from Director
81 Dmsg1(100, "<dird: %s\n", dir->msg);
82 if (sscanf(dir->msg, jobcmd, &JobId, job.c_str(), job_name.c_str(),
84 &JobType, &level, fileset_name.c_str(), &no_attributes,
85 &spool_attributes, fileset_md5.c_str(), &spool_data,
86 &write_part_after_job, &NewVol) != 13) {
87 pm_strcpy(jcr->errmsg, dir->msg);
88 bnet_fsend(dir, BAD_job, jcr->errmsg);
89 Dmsg1(100, ">dird: %s\n", dir->msg);
90 Emsg1(M_FATAL, 0, _("Bad Job Command from Director: %s\n"), jcr->errmsg);
91 set_jcr_job_status(jcr, JS_ErrorTerminated);
95 * Since this job could be rescheduled, we
96 * check to see if we have it already. If so
97 * free the old jcr and use the new one.
99 ojcr = get_jcr_by_full_name(job.c_str());
100 if (ojcr && !ojcr->authenticated) {
101 Dmsg2(100, "Found ojcr=0x%x Job %s\n", (unsigned)(long)ojcr, job.c_str());
105 jcr->VolSessionId = newVolSessionId();
106 jcr->VolSessionTime = VolSessionTime;
107 bstrncpy(jcr->Job, job, sizeof(jcr->Job));
108 unbash_spaces(job_name);
109 jcr->job_name = get_pool_memory(PM_NAME);
110 pm_strcpy(jcr->job_name, job_name);
111 unbash_spaces(client_name);
112 jcr->client_name = get_pool_memory(PM_NAME);
113 pm_strcpy(jcr->client_name, client_name);
114 unbash_spaces(fileset_name);
115 jcr->fileset_name = get_pool_memory(PM_NAME);
116 pm_strcpy(jcr->fileset_name, fileset_name);
117 jcr->JobType = JobType;
118 jcr->JobLevel = level;
119 jcr->no_attributes = no_attributes;
120 jcr->spool_attributes = spool_attributes;
121 jcr->spool_data = spool_data;
122 jcr->write_part_after_job = write_part_after_job;
123 jcr->fileset_md5 = get_pool_memory(PM_NAME);
124 pm_strcpy(jcr->fileset_md5, fileset_md5);
125 jcr->NewVolEachJob = NewVol;
127 jcr->authenticated = false;
130 * Pass back an authorization key for the File daemon
132 make_session_key(auth_key, NULL, 1);
133 bnet_fsend(dir, OKjob, jcr->VolSessionId, jcr->VolSessionTime, auth_key);
134 Dmsg1(100, ">dird: %s", dir->msg);
135 jcr->sd_auth_key = bstrdup(auth_key);
136 memset(auth_key, 0, sizeof(auth_key));
137 generate_daemon_event(jcr, "JobStart");
141 bool use_cmd(JCR *jcr)
144 * Wait for the device, media, and pool information
146 if (!use_storage_cmd(jcr)) {
147 set_jcr_job_status(jcr, JS_ErrorTerminated);
148 memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
154 bool run_cmd(JCR *jcr)
158 struct timespec timeout;
161 Dmsg1(100, "Run_cmd: %s\n", jcr->dir_bsock->msg);
162 /* The following jobs don't need the FD */
163 switch (jcr->JobType) {
167 jcr->authenticated = true;
172 set_jcr_job_status(jcr, JS_WaitFD); /* wait for FD to connect */
173 dir_send_job_status(jcr);
175 gettimeofday(&tv, &tz);
176 timeout.tv_nsec = tv.tv_usec * 1000;
177 timeout.tv_sec = tv.tv_sec + 30 * 60; /* wait 30 minutes */
179 Dmsg1(100, "%s waiting on FD to contact SD\n", jcr->Job);
181 * Wait for the File daemon to contact us to start the Job,
182 * when he does, we will be released, unless the 30 minutes
186 for ( ;!job_canceled(jcr); ) {
187 errstat = pthread_cond_timedwait(&jcr->job_start_wait, &jcr->mutex, &timeout);
188 if (errstat == 0 || errstat == ETIMEDOUT) {
194 memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
196 if (jcr->authenticated && !job_canceled(jcr)) {
197 Dmsg1(100, "Running job %s\n", jcr->Job);
198 run_job(jcr); /* Run the job */
204 * After receiving a connection (in job.c) if it is
205 * from the File daemon, this routine is called.
207 void handle_filed_connection(BSOCK *fd, char *job_name)
211 bmicrosleep(0, 50000); /* wait 50 millisecs */
212 if (!(jcr=get_jcr_by_full_name(job_name))) {
213 Jmsg1(NULL, M_FATAL, 0, _("Job name not found: %s\n"), job_name);
214 Dmsg1(100, "Job name not found: %s\n", job_name);
218 jcr->file_bsock = fd;
219 jcr->file_bsock->jcr = jcr;
221 Dmsg1(110, "Found Job %s\n", job_name);
223 if (jcr->authenticated) {
224 Jmsg2(jcr, M_FATAL, 0, "Hey!!!! JobId %u Job %s already authenticated.\n",
225 jcr->JobId, jcr->Job);
231 * Authenticate the File daemon
233 if (jcr->authenticated || !authenticate_filed(jcr)) {
234 Dmsg1(100, "Authentication failed Job %s\n", jcr->Job);
235 Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate File daemon\n"));
237 jcr->authenticated = true;
238 Dmsg1(110, "OK Authentication Job %s\n", jcr->Job);
242 if (!jcr->authenticated) {
243 set_jcr_job_status(jcr, JS_ErrorTerminated);
245 pthread_cond_signal(&jcr->job_start_wait); /* wake waiting job */
253 * Use Device command from Director
254 * He tells is what Device Name to use, the Media Type,
255 * the Pool Name, and the Pool Type.
257 * Ensure that the device exists and is opened, then store
258 * the media and pool info in the JCR.
263 char name[MAX_NAME_LENGTH];
264 char media_type[MAX_NAME_LENGTH];
265 char pool_name[MAX_NAME_LENGTH];
266 char pool_type[MAX_NAME_LENGTH];
269 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append);
270 static int reserve_device(JCR *jcr, DIRSTORE *store, DEVRES *device, char *device_name, int append);
272 static bool use_storage_cmd(JCR *jcr)
274 POOL_MEM store_name, dev_name, media_type, pool_name, pool_type;
275 BSOCK *dir = jcr->dir_bsock;
284 * If there are multiple devices, the director sends us
285 * use_device for each device that it wants to use.
287 Dmsg1(100, "<dird: %s", dir->msg);
288 dirstore = New(alist(10, not_owned_by_alist));
290 ok = sscanf(dir->msg, use_storage, store_name.c_str(),
291 media_type.c_str(), pool_name.c_str(),
292 pool_type.c_str(), &append, &Copy, &Stripe) == 7;
296 unbash_spaces(store_name);
297 unbash_spaces(media_type);
298 unbash_spaces(pool_name);
299 unbash_spaces(pool_type);
300 store = new DIRSTORE;
301 dirstore->append(store);
302 memset(store, 0, sizeof(DIRSTORE));
303 store->device = New(alist(10));
304 bstrncpy(store->name, store_name, sizeof(store->name));
305 bstrncpy(store->media_type, media_type, sizeof(store->media_type));
306 bstrncpy(store->pool_name, pool_name, sizeof(store->pool_name));
307 bstrncpy(store->pool_type, pool_type, sizeof(store->pool_type));
309 /* Now get all devices */
310 while (bnet_recv(dir) >= 0) {
311 ok = sscanf(dir->msg, use_device, dev_name.c_str()) == 1;
315 unbash_spaces(dev_name);
316 store->device->append(bstrdup(dev_name.c_str()));
318 } while (ok && bnet_recv(dir) >= 0);
321 /* This loop is debug code and can be removed */
322 /* ***FIXME**** remove after 1.38 release */
323 foreach_alist(store, dirstore) {
324 Dmsg4(100, "Storage=%s media_type=%s pool=%s pool_type=%s\n",
325 store->name, store->media_type, store->pool_name,
327 foreach_alist(device_name, store->device) {
328 Dmsg1(100, " Device=%s\n", device_name);
334 * At this point, we have a list of all the Director's Storage
335 * resources indicated for this Job, which include Pool, PoolType,
336 * storage name, and Media type.
337 * Then for each of the Storage resources, we have a list of
338 * device names that were given.
340 * Wiffle through them and find one that can do the backup.
344 init_jcr_device_wait_timers(jcr);
346 int need_wait = false;
347 foreach_alist(store, dirstore) {
348 foreach_alist(device_name, store->device) {
350 stat = search_res_for_device(jcr, store, device_name, append);
351 if (stat == 1) { /* found available device */
354 dcr->Stripe = Stripe;
357 } else if (stat == 0) { /* device busy */
363 * If there is some device for which we can wait, then
364 * wait and try again until the wait time expires
366 if (!need_wait || !wait_for_device(jcr, jcr->errmsg, first)) {
372 unbash_spaces(dir->msg);
373 pm_strcpy(jcr->errmsg, dir->msg);
374 Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
376 Jmsg(jcr, M_FATAL, 0, _("\n"
377 " Device \"%s\" with MediaType \"%s\" requested by DIR not found in SD Device resources.\n"),
378 dev_name.c_str(), media_type.c_str());
379 bnet_fsend(dir, NO_device, dev_name.c_str());
380 Dmsg1(100, ">dird: %s\n", dir->msg);
383 unbash_spaces(dir->msg);
384 pm_strcpy(jcr->errmsg, dir->msg);
386 Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
388 Jmsg(jcr, M_FATAL, 0, _("Bad Use Device command: %s\n"), jcr->errmsg);
389 bnet_fsend(dir, BAD_use, jcr->errmsg);
390 Dmsg1(100, ">dird: %s\n", dir->msg);
395 foreach_alist(store, dirstore) {
396 delete store->device;
406 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append)
409 AUTOCHANGER *changer;
410 BSOCK *dir = jcr->dir_bsock;
414 Dmsg1(100, "Search res for %s\n", device_name);
415 foreach_res(device, R_DEVICE) {
416 Dmsg1(100, "Try res=%s\n", device->hdr.name);
417 /* Find resource, and make sure we were able to open it */
418 if (fnmatch(device_name, device->hdr.name, 0) == 0 &&
419 strcmp(device->media_type, store->media_type) == 0) {
420 stat = reserve_device(jcr, store, device, device_name, append);
424 Dmsg1(220, "Got: %s", dir->msg);
425 bash_spaces(device_name);
426 ok = bnet_fsend(dir, OK_device, device_name);
427 Dmsg1(100, ">dird: %s\n", dir->msg);
431 foreach_res(changer, R_AUTOCHANGER) {
432 Dmsg1(100, "Try changer res=%s\n", changer->hdr.name);
433 /* Find resource, and make sure we were able to open it */
434 if (fnmatch(device_name, changer->hdr.name, 0) == 0) {
435 /* Try each device in this AutoChanger */
436 foreach_alist(device, changer->device) {
437 Dmsg1(100, "Try changer device %s\n", device->hdr.name);
438 stat = reserve_device(jcr, store, device, device_name, append);
439 if (stat == -1) { /* hard error */
442 if (stat == 0) { /* must wait, try next one */
446 Dmsg1(100, "Device %s opened.\n", device_name);
447 pm_strcpy(dev_name, device->hdr.name);
448 bash_spaces(dev_name);
449 ok = bnet_fsend(dir, OK_device, dev_name.c_str()); /* Return real device name */
450 Dmsg1(100, ">dird: %s\n", dir->msg);
455 return 0; /* nothing found */
459 * Returns: 1 -- OK, have DCR
463 static int reserve_device(JCR *jcr, DIRSTORE *store, DEVRES *device, char *device_name, int append)
467 const int name_len = MAX_NAME_LENGTH;
469 device->dev = init_dev(jcr, NULL, device);
472 if (dev_cap(device->dev, CAP_AUTOCHANGER)) {
473 Jmsg(jcr, M_WARNING, 0, _("\n"
474 " Device \"%s\" in changer \"%s\" requested by DIR could not be opened or does not exist.\n"),
475 device->hdr.name, device_name);
477 Jmsg(jcr, M_WARNING, 0, _("\n"
478 " Device \"%s\" requested by DIR could not be opened or does not exist.\n"),
483 Dmsg1(100, "Found device %s\n", device->hdr.name);
484 dcr = new_dcr(jcr, device->dev);
486 BSOCK *dir = jcr->dir_bsock;
487 bnet_fsend(dir, _("3926 Could not get dcr for device: %s\n"), device_name);
488 Dmsg1(100, ">dird: %s\n", dir->msg);
492 bstrncpy(dcr->pool_name, store->pool_name, name_len);
493 bstrncpy(dcr->pool_type, store->pool_type, name_len);
494 bstrncpy(dcr->media_type, store->media_type, name_len);
495 bstrncpy(dcr->dev_name, device_name, name_len);
496 if (append == SD_APPEND) {
497 ok = reserve_device_for_append(dcr);
499 ok = reserve_device_for_read(dcr);
511 * Query Device command from Director
512 * Sends Storage Daemon's information on the device to the
513 * caller (presumably the Director).
514 * This command always returns "true" so that the line is
515 * not closed on an error.
518 bool query_cmd(JCR *jcr)
520 POOL_MEM dev_name, VolumeName, MediaType, ChangerName;
521 BSOCK *dir = jcr->dir_bsock;
523 AUTOCHANGER *changer;
526 Dmsg1(100, "Query_cmd: %s", dir->msg);
527 ok = sscanf(dir->msg, query_device, dev_name.c_str()) == 1;
528 Dmsg1(100, "<dird: %s\n", dir->msg);
530 unbash_spaces(dev_name);
532 foreach_res(device, R_DEVICE) {
533 /* Find resource, and make sure we were able to open it */
534 if (fnmatch(dev_name.c_str(), device->hdr.name, 0) == 0) {
536 device->dev = init_dev(jcr, NULL, device);
542 ok = dir_update_device(jcr, device->dev);
544 ok = bnet_fsend(dir, OK_query);
546 bnet_fsend(dir, NO_query);
551 foreach_res(changer, R_AUTOCHANGER) {
552 /* Find resource, and make sure we were able to open it */
553 if (fnmatch(dev_name.c_str(), changer->hdr.name, 0) == 0) {
555 if (!changer->device || changer->device->size() == 0) {
556 continue; /* no devices */
558 ok = dir_update_changer(jcr, changer);
560 ok = bnet_fsend(dir, OK_query);
562 bnet_fsend(dir, NO_query);
567 /* If we get here, the device/autochanger was not found */
569 unbash_spaces(dir->msg);
570 pm_strcpy(jcr->errmsg, dir->msg);
571 bnet_fsend(dir, NO_device, dev_name.c_str());
572 Dmsg1(100, ">dird: %s\n", dir->msg);
574 unbash_spaces(dir->msg);
575 pm_strcpy(jcr->errmsg, dir->msg);
576 bnet_fsend(dir, BAD_query, jcr->errmsg);
577 Dmsg1(100, ">dird: %s\n", dir->msg);
587 * Destroy the Job Control Record and associated
588 * resources (sockets).
590 void stored_free_jcr(JCR *jcr)
592 if (jcr->file_bsock) {
593 bnet_close(jcr->file_bsock);
594 jcr->file_bsock = NULL;
597 free_pool_memory(jcr->job_name);
599 if (jcr->client_name) {
600 free_memory(jcr->client_name);
601 jcr->client_name = NULL;
603 if (jcr->fileset_name) {
604 free_memory(jcr->fileset_name);
606 if (jcr->fileset_md5) {
607 free_memory(jcr->fileset_md5);
613 if (jcr->RestoreBootstrap) {
614 unlink(jcr->RestoreBootstrap);
615 free_pool_memory(jcr->RestoreBootstrap);
616 jcr->RestoreBootstrap = NULL;
618 if (jcr->next_dev || jcr->prev_dev) {
619 Emsg0(M_FATAL, 0, _("In free_jcr(), but still attached to device!!!!\n"));
621 pthread_cond_destroy(&jcr->job_start_wait);