2 * Job control and execution for Storage Daemon
10 Copyright (C) 2000-2005 Kern Sibbald
12 This program is free software; you can redistribute it and/or
13 modify it under the terms of the GNU General Public License
14 version 2 as ammended with additional clauses defined in the
15 file LICENSE in the main source directory.
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 the file LICENSE for additional details.
27 /* Imported variables */
28 extern uint32_t VolSessionTime;
30 /* Imported functions */
31 extern uint32_t newVolSessionId();
33 /* Forward referenced functions */
34 static bool use_storage_cmd(JCR *jcr);
36 /* Requests from the Director daemon */
37 static char jobcmd[] = "JobId=%d job=%127s job_name=%127s client_name=%127s "
38 "type=%d level=%d FileSet=%127s NoAttr=%d SpoolAttr=%d FileSetMD5=%127s "
39 "SpoolData=%d WritePartAfterJob=%d NewVol=%d\n";
40 static char use_storage[] = "use storage=%127s media_type=%127s "
41 "pool_name=%127s pool_type=%127s append=%d copy=%d stripe=%d\n";
42 static char use_device[] = "use device=%127s\n";
43 //static char query_device[] = "query device=%127s";
46 /* Responses sent to Director daemon */
47 static char OKjob[] = "3000 OK Job SDid=%u SDtime=%u Authorization=%s\n";
48 static char OK_device[] = "3000 OK use device device=%s\n";
49 static char NO_device[] = "3924 Device \"%s\" not in SD Device resources.\n";
50 //static char NOT_open[] = "3925 Device \"%s\" could not be opened or does not exist.\n";
51 static char BAD_use[] = "3913 Bad use command: %s\n";
52 static char BAD_job[] = "3915 Bad Job command: %s\n";
53 //static char OK_query[] = "3001 OK query\n";
54 //static char NO_query[] = "3918 Query failed\n";
55 //static char BAD_query[] = "3917 Bad query command: %s\n";
58 * Director requests us to start a job
59 * Basic tasks done here:
60 * - We pickup the JobId to be run from the Director.
61 * - We pickup the device, media, and pool from the Director
62 * - Wait for a connection from the File Daemon (FD)
63 * - Accept commands from the FD (i.e. run the job)
64 * - Return when the connection is terminated or
67 bool job_cmd(JCR *jcr)
71 BSOCK *dir = jcr->dir_bsock;
72 POOL_MEM job_name, client_name, job, fileset_name, fileset_md5;
73 int JobType, level, spool_attributes, no_attributes, spool_data;
74 int write_part_after_job, NewVol;
79 * Get JobId and permissions from Director
81 Dmsg1(100, "<dird: %s\n", dir->msg);
82 if (sscanf(dir->msg, jobcmd, &JobId, job.c_str(), job_name.c_str(),
84 &JobType, &level, fileset_name.c_str(), &no_attributes,
85 &spool_attributes, fileset_md5.c_str(), &spool_data,
86 &write_part_after_job, &NewVol) != 13) {
87 pm_strcpy(jcr->errmsg, dir->msg);
88 bnet_fsend(dir, BAD_job, jcr->errmsg);
89 Dmsg1(100, ">dird: %s\n", dir->msg);
90 Emsg1(M_FATAL, 0, _("Bad Job Command from Director: %s\n"), jcr->errmsg);
91 set_jcr_job_status(jcr, JS_ErrorTerminated);
95 * Since this job could be rescheduled, we
96 * check to see if we have it already. If so
97 * free the old jcr and use the new one.
99 ojcr = get_jcr_by_full_name(job.c_str());
100 if (ojcr && !ojcr->authenticated) {
101 Dmsg2(100, "Found ojcr=0x%x Job %s\n", (unsigned)(long)ojcr, job.c_str());
105 jcr->VolSessionId = newVolSessionId();
106 jcr->VolSessionTime = VolSessionTime;
107 bstrncpy(jcr->Job, job, sizeof(jcr->Job));
108 unbash_spaces(job_name);
109 jcr->job_name = get_pool_memory(PM_NAME);
110 pm_strcpy(jcr->job_name, job_name);
111 unbash_spaces(client_name);
112 jcr->client_name = get_pool_memory(PM_NAME);
113 pm_strcpy(jcr->client_name, client_name);
114 unbash_spaces(fileset_name);
115 jcr->fileset_name = get_pool_memory(PM_NAME);
116 pm_strcpy(jcr->fileset_name, fileset_name);
117 jcr->JobType = JobType;
118 jcr->JobLevel = level;
119 jcr->no_attributes = no_attributes;
120 jcr->spool_attributes = spool_attributes;
121 jcr->spool_data = spool_data;
122 jcr->write_part_after_job = write_part_after_job;
123 jcr->fileset_md5 = get_pool_memory(PM_NAME);
124 pm_strcpy(jcr->fileset_md5, fileset_md5);
125 jcr->NewVolEachJob = NewVol;
127 jcr->authenticated = false;
130 * Pass back an authorization key for the File daemon
132 make_session_key(auth_key, NULL, 1);
133 bnet_fsend(dir, OKjob, jcr->VolSessionId, jcr->VolSessionTime, auth_key);
134 Dmsg1(100, ">dird: %s", dir->msg);
135 jcr->sd_auth_key = bstrdup(auth_key);
136 memset(auth_key, 0, sizeof(auth_key));
137 generate_daemon_event(jcr, "JobStart");
141 bool use_cmd(JCR *jcr)
144 * Wait for the device, media, and pool information
146 if (!use_storage_cmd(jcr)) {
147 set_jcr_job_status(jcr, JS_ErrorTerminated);
148 memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
154 bool run_cmd(JCR *jcr)
158 struct timespec timeout;
161 Dmsg1(100, "Run_cmd: %s\n", jcr->dir_bsock->msg);
162 /* The following jobs don't need the FD */
163 switch (jcr->JobType) {
167 jcr->authenticated = true;
172 set_jcr_job_status(jcr, JS_WaitFD); /* wait for FD to connect */
173 dir_send_job_status(jcr);
175 gettimeofday(&tv, &tz);
176 timeout.tv_nsec = tv.tv_usec * 1000;
177 timeout.tv_sec = tv.tv_sec + 30 * 60; /* wait 30 minutes */
179 Dmsg1(100, "%s waiting on FD to contact SD\n", jcr->Job);
181 * Wait for the File daemon to contact us to start the Job,
182 * when he does, we will be released, unless the 30 minutes
186 for ( ;!job_canceled(jcr); ) {
187 errstat = pthread_cond_timedwait(&jcr->job_start_wait, &jcr->mutex, &timeout);
188 if (errstat == 0 || errstat == ETIMEDOUT) {
194 memset(jcr->sd_auth_key, 0, strlen(jcr->sd_auth_key));
196 if (jcr->authenticated && !job_canceled(jcr)) {
197 Dmsg1(100, "Running job %s\n", jcr->Job);
198 run_job(jcr); /* Run the job */
204 * After receiving a connection (in job.c) if it is
205 * from the File daemon, this routine is called.
207 void handle_filed_connection(BSOCK *fd, char *job_name)
211 bmicrosleep(0, 50000); /* wait 50 millisecs */
212 if (!(jcr=get_jcr_by_full_name(job_name))) {
213 Jmsg1(NULL, M_FATAL, 0, _("Job name not found: %s\n"), job_name);
214 Dmsg1(100, "Job name not found: %s\n", job_name);
218 jcr->file_bsock = fd;
219 jcr->file_bsock->jcr = jcr;
221 Dmsg1(110, "Found Job %s\n", job_name);
223 if (jcr->authenticated) {
224 Jmsg2(jcr, M_FATAL, 0, "Hey!!!! JobId %u Job %s already authenticated.\n",
225 jcr->JobId, jcr->Job);
231 * Authenticate the File daemon
233 if (jcr->authenticated || !authenticate_filed(jcr)) {
234 Dmsg1(100, "Authentication failed Job %s\n", jcr->Job);
235 Jmsg(jcr, M_FATAL, 0, _("Unable to authenticate File daemon\n"));
237 jcr->authenticated = true;
238 Dmsg1(110, "OK Authentication Job %s\n", jcr->Job);
242 if (!jcr->authenticated) {
243 set_jcr_job_status(jcr, JS_ErrorTerminated);
245 pthread_cond_signal(&jcr->job_start_wait); /* wake waiting job */
253 * Use Device command from Director
254 * He tells is what Device Name to use, the Media Type,
255 * the Pool Name, and the Pool Type.
257 * Ensure that the device exists and is opened, then store
258 * the media and pool info in the JCR.
263 char name[MAX_NAME_LENGTH];
264 char media_type[MAX_NAME_LENGTH];
265 char pool_name[MAX_NAME_LENGTH];
266 char pool_type[MAX_NAME_LENGTH];
269 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append);
271 static bool use_storage_cmd(JCR *jcr)
273 POOL_MEM store_name, dev_name, media_type, pool_name, pool_type;
274 BSOCK *dir = jcr->dir_bsock;
283 * If there are multiple devices, the director sends us
284 * use_device for each device that it wants to use.
286 Dmsg1(100, "<dird: %s", dir->msg);
287 dirstore = New(alist(10, not_owned_by_alist));
289 ok = sscanf(dir->msg, use_storage, store_name.c_str(),
290 media_type.c_str(), pool_name.c_str(),
291 pool_type.c_str(), &append, &Copy, &Stripe) == 7;
295 unbash_spaces(store_name);
296 unbash_spaces(media_type);
297 unbash_spaces(pool_name);
298 unbash_spaces(pool_type);
299 store = new DIRSTORE;
300 dirstore->append(store);
301 memset(store, 0, sizeof(DIRSTORE));
302 store->device = New(alist(10));
303 bstrncpy(store->name, store_name, sizeof(store->name));
304 bstrncpy(store->media_type, media_type, sizeof(store->media_type));
305 bstrncpy(store->pool_name, pool_name, sizeof(store->pool_name));
306 bstrncpy(store->pool_type, pool_type, sizeof(store->pool_type));
308 /* Now get all devices */
309 while (bnet_recv(dir) >= 0) {
310 ok = sscanf(dir->msg, use_device, dev_name.c_str()) == 1;
314 unbash_spaces(dev_name);
315 store->device->append(bstrdup(dev_name.c_str()));
317 } while (ok && bnet_recv(dir) >= 0);
320 /* This loop is debug code and can be removed */
321 /* ***FIXME**** remove after 1.38 release */
322 foreach_alist(store, dirstore) {
323 Dmsg4(100, "Storage=%s media_type=%s pool=%s pool_type=%s\n",
324 store->name, store->media_type, store->pool_name,
326 foreach_alist(device_name, store->device) {
327 Dmsg1(100, " Device=%s\n", device_name);
333 * At this point, we have a list of all the Director's Storage
334 * resources indicated for this Job, which include Pool, PoolType,
335 * storage name, and Media type.
336 * Then for each of the Storage resources, we have a list of
337 * device names that were given.
339 * Wiffle through them and find one that can do the backup.
343 init_jcr_device_wait_timers(jcr);
345 int need_wait = false;
346 foreach_alist(store, dirstore) {
347 foreach_alist(device_name, store->device) {
349 stat = search_res_for_device(jcr, store, device_name, append);
350 if (stat == 1) { /* found available device */
353 dcr->Stripe = Stripe;
356 } else if (stat == 0) { /* device busy */
362 * If there is some device for which we can wait, then
363 * wait and try again until the wait time expires
365 if (!need_wait || !wait_for_device(jcr, jcr->errmsg, first)) {
371 unbash_spaces(dir->msg);
372 pm_strcpy(jcr->errmsg, dir->msg);
373 Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
375 Jmsg(jcr, M_FATAL, 0, _("\n"
376 " Device \"%s\" with MediaType \"%s\" requested by DIR not found in SD Device resources.\n"),
377 dev_name.c_str(), media_type.c_str());
378 bnet_fsend(dir, NO_device, dev_name.c_str());
379 Dmsg1(100, ">dird: %s\n", dir->msg);
382 unbash_spaces(dir->msg);
383 pm_strcpy(jcr->errmsg, dir->msg);
385 Jmsg(jcr, M_INFO, 0, _("Failed command: %s\n"), jcr->errmsg);
387 Jmsg(jcr, M_FATAL, 0, _("Bad Use Device command: %s\n"), jcr->errmsg);
388 bnet_fsend(dir, BAD_use, jcr->errmsg);
389 Dmsg1(100, ">dird: %s\n", dir->msg);
394 foreach_alist(store, dirstore) {
395 delete store->device;
406 * Returns: 1 -- OK, have DCR
410 static int search_res_for_device(JCR *jcr, DIRSTORE *store, char *device_name, int append)
413 AUTOCHANGER *changer;
414 BSOCK *dir = jcr->dir_bsock;
418 Dmsg1(100, "Search res for %s\n", device_name);
419 foreach_res(device, R_DEVICE) {
420 Dmsg1(100, "Try res=%s\n", device->hdr.name);
421 /* Find resource, and make sure we were able to open it */
422 if (fnmatch(device_name, device->hdr.name, 0) == 0 &&
423 strcmp(device->media_type, store->media_type) == 0) {
424 const int name_len = MAX_NAME_LENGTH;
426 device->dev = init_dev(jcr, NULL, device);
429 Jmsg(jcr, M_WARNING, 0, _("\n"
430 " Device \"%s\" requested by DIR could not be opened or does not exist.\n"),
434 Dmsg1(100, "Found device %s\n", device->hdr.name);
435 dcr = new_dcr(jcr, device->dev);
437 bnet_fsend(dir, _("3926 Could not get dcr for device: %s\n"), device_name);
438 Dmsg1(100, ">dird: %s\n", dir->msg);
442 bstrncpy(dcr->pool_name, store->pool_name, name_len);
443 bstrncpy(dcr->pool_type, store->pool_type, name_len);
444 bstrncpy(dcr->media_type, store->media_type, name_len);
445 bstrncpy(dcr->dev_name, device_name, name_len);
446 if (append == SD_APPEND) {
447 ok = reserve_device_for_append(dcr);
449 ok = reserve_device_for_read(dcr);
455 Dmsg1(220, "Got: %s", dir->msg);
456 bash_spaces(device_name);
457 ok = bnet_fsend(dir, OK_device, device_name);
458 Dmsg1(100, ">dird: %s\n", dir->msg);
462 foreach_res(changer, R_AUTOCHANGER) {
463 Dmsg1(100, "Try changer res=%s\n", changer->hdr.name);
464 /* Find resource, and make sure we were able to open it */
465 if (fnmatch(device_name, changer->hdr.name, 0) == 0) {
466 const int name_len = MAX_NAME_LENGTH;
467 /* Try each device in this AutoChanger */
468 foreach_alist(device, changer->device) {
469 Dmsg1(100, "Try changer device %s\n", device->hdr.name);
471 device->dev = init_dev(jcr, NULL, device);
474 Dmsg1(100, "Device %s could not be opened. Skipped\n", device_name);
475 Jmsg(jcr, M_WARNING, 0, _("\n"
476 " Device \"%s\" in changer \"%s\" requested by DIR could not be opened or does not exist.\n"),
477 device->hdr.name, device_name);
480 if (!device->dev->autoselect) {
481 continue; /* device is not available */
483 dcr = new_dcr(jcr, device->dev);
485 bnet_fsend(dir, _("3926 Could not get dcr for device: %s\n"), device_name);
486 Dmsg1(100, ">dird: %s\n", dir->msg);
489 Dmsg1(100, "Found changer device %s\n", device->hdr.name);
490 bstrncpy(dcr->pool_name, store->pool_name, name_len);
491 bstrncpy(dcr->pool_type, store->pool_type, name_len);
492 bstrncpy(dcr->media_type, store->media_type, name_len);
493 bstrncpy(dcr->dev_name, device_name, name_len);
495 if (append == SD_APPEND) {
496 ok = reserve_device_for_append(dcr);
498 ok = reserve_device_for_read(dcr);
501 Jmsg(jcr, M_WARNING, 0, _("Could not reserve device: %s\n"), device_name);
506 Dmsg1(100, "Device %s opened.\n", device_name);
507 pm_strcpy(dev_name, device->hdr.name);
508 bash_spaces(dev_name);
509 ok = bnet_fsend(dir, OK_device, dev_name.c_str()); /* Return real device name */
510 Dmsg1(100, ">dird: %s\n", dir->msg);
515 return 0; /* nothing found */
521 * Query Device command from Director
522 * Sends Storage Daemon's information on the device to the
523 * caller (presumably the Director).
524 * This command always returns "true" so that the line is
525 * not closed on an error.
528 bool query_cmd(JCR *jcr)
530 POOL_MEM dev_name, VolumeName, MediaType, ChangerName;
531 BSOCK *dir = jcr->dir_bsock;
533 AUTOCHANGER *changer;
536 Dmsg1(100, "Query_cmd: %s", dir->msg);
537 ok = sscanf(dir->msg, query_device, dev_name.c_str()) == 1;
538 Dmsg1(100, "<dird: %s\n", dir->msg);
540 unbash_spaces(dev_name);
542 foreach_res(device, R_DEVICE) {
543 /* Find resource, and make sure we were able to open it */
544 if (fnmatch(dev_name.c_str(), device->hdr.name, 0) == 0) {
546 device->dev = init_dev(jcr, NULL, device);
552 ok = dir_update_device(jcr, device->dev);
554 ok = bnet_fsend(dir, OK_query);
556 bnet_fsend(dir, NO_query);
561 foreach_res(changer, R_AUTOCHANGER) {
562 /* Find resource, and make sure we were able to open it */
563 if (fnmatch(dev_name.c_str(), changer->hdr.name, 0) == 0) {
565 if (!changer->device || changer->device->size() == 0) {
566 continue; /* no devices */
568 ok = dir_update_changer(jcr, changer);
570 ok = bnet_fsend(dir, OK_query);
572 bnet_fsend(dir, NO_query);
577 /* If we get here, the device/autochanger was not found */
579 unbash_spaces(dir->msg);
580 pm_strcpy(jcr->errmsg, dir->msg);
581 bnet_fsend(dir, NO_device, dev_name.c_str());
582 Dmsg1(100, ">dird: %s\n", dir->msg);
584 unbash_spaces(dir->msg);
585 pm_strcpy(jcr->errmsg, dir->msg);
586 bnet_fsend(dir, BAD_query, jcr->errmsg);
587 Dmsg1(100, ">dird: %s\n", dir->msg);
597 * Destroy the Job Control Record and associated
598 * resources (sockets).
600 void stored_free_jcr(JCR *jcr)
602 if (jcr->file_bsock) {
603 bnet_close(jcr->file_bsock);
604 jcr->file_bsock = NULL;
607 free_pool_memory(jcr->job_name);
609 if (jcr->client_name) {
610 free_memory(jcr->client_name);
611 jcr->client_name = NULL;
613 if (jcr->fileset_name) {
614 free_memory(jcr->fileset_name);
616 if (jcr->fileset_md5) {
617 free_memory(jcr->fileset_md5);
623 if (jcr->RestoreBootstrap) {
624 unlink(jcr->RestoreBootstrap);
625 free_pool_memory(jcr->RestoreBootstrap);
626 jcr->RestoreBootstrap = NULL;
628 if (jcr->next_dev || jcr->prev_dev) {
629 Emsg0(M_FATAL, 0, _("In free_jcr(), but still attached to device!!!!\n"));
631 pthread_cond_destroy(&jcr->job_start_wait);