2 * Bacula File Daemon Job processing
4 * Kern Sibbald, October MM
8 Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
10 This program is free software; you can redistribute it and/or
11 modify it under the terms of the GNU General Public License as
12 published by the Free Software Foundation; either version 2 of
13 the License, or (at your option) any later version.
15 This program is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 General Public License for more details.
20 You should have received a copy of the GNU General Public
21 License along with this program; if not, write to the Free
22 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
30 extern char my_name[];
32 /* Imported functions */
33 extern int status_cmd(JCR *jcr);
35 /* Forward referenced functions */
36 static int backup_cmd(JCR *jcr);
37 static int cancel_cmd(JCR *jcr);
38 static int setdebug_cmd(JCR *jcr);
39 static int estimate_cmd(JCR *jcr);
40 static int exclude_cmd(JCR *jcr);
41 static int hello_cmd(JCR *jcr);
42 static int job_cmd(JCR *jcr);
43 static int include_cmd(JCR *jcr);
44 static int level_cmd(JCR *jcr);
45 static int verify_cmd(JCR *jcr);
46 static int restore_cmd(JCR *jcr);
47 static int storage_cmd(JCR *jcr);
48 static int session_cmd(JCR *jcr);
49 static int response(BSOCK *sd, char *resp, char *cmd);
50 static void filed_free_jcr(JCR *jcr);
54 /* Exported functions */
62 * The following are the recognized commands from the Director.
64 static struct s_cmds cmds[] = {
65 {"backup", backup_cmd},
66 {"cancel", cancel_cmd},
67 {"setdebug=", setdebug_cmd},
68 {"estimate", estimate_cmd},
69 {"exclude", exclude_cmd},
71 {"include", include_cmd},
73 {"level = ", level_cmd},
74 {"restore", restore_cmd},
75 {"session", session_cmd},
76 {"status", status_cmd},
77 {"storage ", storage_cmd},
78 {"verify", verify_cmd},
79 {NULL, NULL} /* list terminator */
82 /* Commands received from director that need scanning */
83 static char jobcmd[] = "JobId=%d Job=%127s SDid=%d SDtime=%d Authorization=%100s";
84 static char storaddr[] = "storage address=%s port=%d\n";
85 static char sessioncmd[] = "session %s %ld %ld %ld %ld %ld %ld\n";
86 static char restorecmd[] = "restore where=%s\n";
88 /* Responses sent to Director */
89 static char errmsg[] = "2999 Invalid command\n";
90 static char no_auth[] = "2998 No Authorization\n";
91 static char OKinc[] = "2000 OK include\n";
92 static char OKest[] = "2000 OK estimate files=%ld bytes=%ld\n";
93 static char OKexc[] = "2000 OK exclude\n";
94 static char OKlevel[] = "2000 OK level\n";
95 static char OKbackup[] = "2000 OK backup\n";
96 static char OKverify[] = "2000 OK verify\n";
97 static char OKrestore[] = "2000 OK restore\n";
98 static char OKsession[] = "2000 OK session\n";
99 static char OKstore[] = "2000 OK storage\n";
100 static char OKjob[] = "2000 OK Job\n";
101 static char OKsetdebug[] = "2000 OK setdebug=%d\n";
102 static char BADjob[] = "2901 Bad Job\n";
104 /* Responses received from Storage Daemon */
105 static char OK_end[] = "3000 OK end\n";
106 static char OK_open[] = "3000 OK open ticket = %d\n";
107 static char OK_data[] = "3000 OK data\n";
108 static char OK_append[] = "3000 OK append data\n";
111 /* Commands sent to Storage Daemon */
112 static char append_open[] = "append open session\n";
113 static char append_data[] = "append data %d\n";
114 static char append_end[] = "append end session %d\n";
115 static char append_close[] = "append close session %d\n";
116 static char read_open[] = "read open session = %s %ld %ld %ld %ld %ld %ld\n";
117 static char read_data[] = "read data %d\n";
118 static char read_close[] = "read close session %d\n";
121 * Accept requests from a Director
123 * NOTE! We are running as a separate thread
125 * Send output one line
126 * at a time followed by a zero length transmission.
128 * Return when the connection is terminated or there
131 * Basic task here is:
132 * Authenticate Director (during Hello command).
133 * Accept commands one at a time from the Director
137 void *handle_client_request(void *dirp)
141 BSOCK *dir = (BSOCK *) dirp;
143 jcr = new_jcr(sizeof(JCR), filed_free_jcr); /* create JCR */
144 jcr->dir_bsock = dir;
145 jcr->ff = init_find_files();
146 jcr->start_time = time(NULL);
147 jcr->last_fname = (char *) get_pool_memory(PM_FNAME);
148 jcr->client_name = bstrdup(my_name);
150 /**********FIXME******* add command handler error code */
152 for (quit=0; !quit;) {
155 if (bnet_recv(dir) <= 0) {
156 break; /* connection terminated */
158 dir->msg[dir->msglen] = 0;
159 Dmsg1(9, "<dird: %s", dir->msg);
161 for (i=0; cmds[i].cmd; i++) {
162 if (strncmp(cmds[i].cmd, dir->msg, strlen(cmds[i].cmd)) == 0) {
163 if (!jcr->authenticated && cmds[i].func != hello_cmd) {
164 bnet_fsend(dir, no_auth);
167 if (!cmds[i].func(jcr)) { /* do command */
168 quit = TRUE; /* error, get out */
169 Dmsg0(20, "Command error\n");
171 found = TRUE; /* indicate command found */
175 if (!found) { /* command not found */
176 bnet_fsend(dir, errmsg);
181 Dmsg0(20, "Calling term_find_files\n");
182 term_find_files(jcr->ff);
183 Dmsg0(20, "Done with term_find_files\n");
184 free_jcr(jcr); /* destroy JCR record */
185 Dmsg0(20, "Done with free_jcr\n");
190 * Hello from Director he must identify himself and provide his
193 static int hello_cmd(JCR *jcr)
195 Dmsg0(20, "Calling Authenticate\n");
196 if (!authenticate_director(jcr)) {
199 Dmsg0(20, "OK Authenticate\n");
200 jcr->authenticated = TRUE;
207 static int cancel_cmd(JCR *jcr)
209 BSOCK *dir = jcr->dir_bsock;
210 char Job[MAX_NAME_LENGTH];
213 if (sscanf(dir->msg, "cancel Job=%127s", Job) == 1) {
214 if (!(cjcr=get_jcr_by_full_name(Job))) {
215 bnet_fsend(dir, "2901 Job %s not found.\n", Job);
217 cjcr->JobStatus = JS_Cancelled;
219 bnet_fsend(dir, "2001 Job %s marked to be cancelled.\n", Job);
222 bnet_fsend(dir, "2902 Error scanning cancel command.\n");
224 bnet_sig(dir, BNET_EOF);
230 * Set debug level as requested by the Director
233 static int setdebug_cmd(JCR *jcr)
235 BSOCK *dir = jcr->dir_bsock;
238 Dmsg1(10, "setdebug_cmd: %s", dir->msg);
239 if (sscanf(dir->msg, "setdebug=%d", &level) != 1 || level < 0) {
240 bnet_fsend(dir, "2991 Bad setdebug command: %s\n", dir->msg);
244 return bnet_fsend(dir, OKsetdebug, level);
248 static int estimate_cmd(JCR *jcr)
250 BSOCK *dir = jcr->dir_bsock;
252 return bnet_fsend(dir, OKest, jcr->JobFiles, jcr->JobBytes);
256 * Get JobId and Storage Daemon Authorization key from Director
258 static int job_cmd(JCR *jcr)
260 BSOCK *dir = jcr->dir_bsock;
263 sd_auth_key = (char *) get_memory(dir->msglen);
264 if (sscanf(dir->msg, jobcmd, &jcr->JobId, jcr->Job,
265 &jcr->VolSessionId, &jcr->VolSessionTime,
267 bnet_fsend(dir, BADjob);
268 Emsg1(M_FATAL, 0, _("Bad Job Command: %s\n"), dir->msg);
269 free_pool_memory(sd_auth_key);
272 jcr->sd_auth_key = bstrdup(sd_auth_key);
273 free_pool_memory(sd_auth_key);
274 Dmsg2(20, "JobId=%d Auth=%s\n", jcr->JobId, jcr->sd_auth_key);
275 return bnet_fsend(dir, OKjob);
280 * Get list of files/directories to include from Director
283 static int include_cmd(JCR *jcr)
285 BSOCK *dir = jcr->dir_bsock;
287 while (bnet_recv(dir) > 0) {
288 dir->msg[dir->msglen] = 0;
289 strip_trailing_junk(dir->msg);
290 Dmsg1(10, "filed<dird: include file %s\n", dir->msg);
291 add_fname_to_include_list(jcr->ff, 1, dir->msg);
294 return bnet_fsend(dir, OKinc);
298 * Get list of files to exclude from Director
301 static int exclude_cmd(JCR *jcr)
303 BSOCK *dir = jcr->dir_bsock;
306 while (bnet_recv(dir) > 0) {
307 dir->msg[dir->msglen] = 0;
308 strip_trailing_junk(dir->msg);
309 /* Skip leading options */
310 for (p=dir->msg; *p && *p != ' '; p++)
313 for ( ; *p && *p == ' '; p++)
315 add_fname_to_exclude_list(jcr->ff, p);
316 Dmsg1(10, "<dird: exclude file %s\n", dir->msg);
319 return bnet_fsend(dir, OKexc);
323 * Get backup level from Director
326 static int level_cmd(JCR *jcr)
328 BSOCK *dir = jcr->dir_bsock;
333 level = (char *) get_memory(dir->msglen);
334 Dmsg1(10, "level_cmd: %s", dir->msg);
335 if (sscanf(dir->msg, "level = %s ", level) != 1) {
336 Jmsg1(jcr, M_FATAL, 0, _("Bad level command: %s\n"), dir->msg);
341 * Full backup requested
343 if (strcmp(level, "full") == 0) {
344 jcr->save_level = L_FULL;
346 * Backup requested since <date> <time>
347 * This form is also used for incremental and differential
349 } else if (strcmp(level, "since") == 0) {
350 jcr->save_level = L_SINCE;
351 if (sscanf(dir->msg, "level = since %d-%d-%d %d:%d:%d",
352 &tm.tm_year, &tm.tm_mon, &tm.tm_mday,
353 &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
354 Jmsg1(jcr, M_FATAL, 0, "Bad scan of date/time: %s\n", dir->msg);
360 tm.tm_wday = tm.tm_yday = 0;
363 Dmsg1(90, "Got since time: %s", ctime(&mtime));
364 jcr->incremental = 1;
366 } else if (strcmp(level, "catalog") == 0) {
367 /* nothing for now */
368 } else if (strcmp(level, "init") == 0) {
369 /* nothing for now */
371 Jmsg1(jcr, M_FATAL, 0, "Unknown backup level: %s\n", level);
376 return bnet_fsend(dir, OKlevel);
380 * Get session parameters from Director -- this is for a Restore command
382 static int session_cmd(JCR *jcr)
384 BSOCK *dir = jcr->dir_bsock;
386 if (sscanf(dir->msg, sessioncmd, jcr->VolumeName,
387 &jcr->VolSessionId, &jcr->VolSessionTime,
388 &jcr->StartFile, &jcr->EndFile,
389 &jcr->StartBlock, &jcr->EndBlock) != 7) {
390 Emsg1(M_FATAL, 0, "Bad session command: %s", dir->msg);
394 return bnet_fsend(dir, OKsession);
398 * Get address of storage daemon from Director
401 static int storage_cmd(JCR *jcr)
403 int stored_port; /* storage daemon port */
404 BSOCK *dir = jcr->dir_bsock;
405 BSOCK *sd; /* storage daemon bsock */
407 if (sscanf(dir->msg, storaddr, &jcr->stored_addr, &stored_port) != 2) {
408 Emsg1(M_FATAL, 0, _("Bad storage command: %s\n"), dir->msg);
411 Dmsg2(30, "Got storage: %s:%d\n", jcr->stored_addr, stored_port);
412 /* Open command communications with Storage daemon */
413 /* Try to connect for 1 hour at 10 second intervals */
414 sd = bnet_connect(jcr, 10, 3600, _("Storage daemon"),
415 jcr->stored_addr, NULL, stored_port, 1);
417 Jmsg2(jcr, M_FATAL, 0, _("Failed to connect to Storage daemon: %s:%d\n"),
418 jcr->stored_addr, stored_port);
422 jcr->store_bsock = sd;
424 bnet_fsend(sd, "Hello Start Job %s\n", jcr->Job);
425 if (!authenticate_storagedaemon(jcr)) {
426 Jmsg(jcr, M_FATAL, 0, _("Failed to authenticate Storage daemon.\n"));
430 /* Send OK to Director */
431 return bnet_fsend(dir, OKstore);
436 * Do a backup. For now, we handle only Full and Incremental.
438 static int backup_cmd(JCR *jcr)
441 BSOCK *dir = jcr->dir_bsock;
442 BSOCK *sd = jcr->store_bsock;
445 jcr->JobStatus = JS_Blocked;
446 jcr->JobType = JT_BACKUP;
447 Dmsg1(100, "begin backup ff=%p\n", jcr->ff);
450 Emsg0(M_FATAL, 0, _("Cannot contact Storage daemon\n"));
451 jcr->JobStatus = JS_ErrorTerminated;
455 bnet_fsend(dir, OKbackup);
456 Dmsg1(10, "bfiled>dird: %s", dir->msg);
459 * Send Append Open Session to Storage daemon
461 bnet_fsend(sd, append_open);
462 Dmsg1(10, ">stored: %s", sd->msg);
464 * Expect to receive back the Ticket number
466 if (bnet_recv(sd) > 0) {
467 Dmsg1(10, "<stored: %s", sd->msg);
468 if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) {
469 Emsg1(M_FATAL, 0, _("Bad response to append open: %s\n"), sd->msg);
470 jcr->JobStatus = JS_ErrorTerminated;
473 Dmsg1(10, "Got Ticket=%d\n", jcr->Ticket);
475 Emsg0(M_FATAL, 0, _("Bad response from stored to open command\n"));
476 jcr->JobStatus = JS_ErrorTerminated;
481 * Send Append data command to Storage daemon
483 bnet_fsend(sd, append_data, jcr->Ticket);
484 Dmsg1(10, ">stored: %s", sd->msg);
487 * Expect to get OK data
489 Dmsg1(10, "<stored: %s", sd->msg);
490 if (!response(sd, OK_data, "Append Data")) {
491 jcr->JobStatus = JS_ErrorTerminated;
496 * Send Files to Storage daemon
498 Dmsg1(100, "begin blast ff=%p\n", jcr->ff);
499 if (!blast_data_to_storage_daemon(jcr, NULL, data_port)) {
500 jcr->JobStatus = JS_ErrorTerminated;
502 jcr->JobStatus = JS_Terminated;
504 * Expect to get response to append_data from Storage daemon
506 if (!response(sd, OK_append, "Append Data")) {
507 jcr->JobStatus = JS_ErrorTerminated;
512 * Send Append End Data to Storage daemon
514 bnet_fsend(sd, append_end, jcr->Ticket);
516 if (!response(sd, OK_end, "Append End")) {
517 jcr->JobStatus = JS_ErrorTerminated;
522 * Send Append Close to Storage daemon
524 bnet_fsend(sd, append_close, jcr->Ticket);
525 while ((len = bnet_recv(sd)) > 0) {
526 /* discard anything else returned from SD */
529 Emsg2(M_FATAL, 0, _("<stored: net_recv len=%d: ERR=%s\n"), len, bnet_strerror(sd));
530 jcr->JobStatus = JS_ErrorTerminated;
536 /* Inform Storage daemon that we are done */
538 bnet_sig(sd, BNET_EOF);
541 /* Inform Director that we are done */
542 bnet_sig(dir, BNET_EOF);
544 return jcr->JobStatus == JS_Terminated;
548 * Do a Verify for Director
551 static int verify_cmd(JCR *jcr)
553 BSOCK *dir = jcr->dir_bsock;
555 jcr->JobType = JT_VERIFY;
556 bnet_fsend(dir, OKverify);
557 Dmsg1(10, "bfiled>dird: %s", dir->msg);
561 /* Inform Director that we are done */
562 return bnet_sig(dir, BNET_EOF);
566 * Do a Restore for Director
569 static int restore_cmd(JCR *jcr)
574 BSOCK *dir = jcr->dir_bsock;
575 BSOCK *sd = jcr->store_bsock;
579 * Scan WHERE (base directory for restore) from command
581 Dmsg0(50, "restore command\n");
582 /* Pickup where string */
583 where = (char *) get_memory(dir->msglen+1);
585 sscanf(dir->msg, restorecmd, where);
586 Dmsg1(50, "Got where=%s\n", where);
589 bnet_fsend(dir, OKrestore);
590 Dmsg1(10, "bfiled>dird: %s", dir->msg);
592 jcr->JobType = JT_RESTORE;
593 jcr->JobStatus = JS_Blocked;
594 ip_addr = (char *) get_pool_memory(PM_FNAME);
596 Dmsg4(20, "VolSessId=%ld VolsessT=%ld SF=%ld EF=%ld\n",
597 jcr->VolSessionId, jcr->VolSessionTime, jcr->StartFile, jcr->EndFile);
598 Dmsg2(20, "JobId=%d vol=%s\n", jcr->JobId, "DummyVolume");
601 * Open Read Session with Storage daemon
603 bnet_fsend(sd, read_open, jcr->VolumeName,
604 jcr->VolSessionId, jcr->VolSessionTime, jcr->StartFile, jcr->EndFile,
605 jcr->StartBlock, jcr->EndBlock);
606 Dmsg1(10, ">stored: %s", sd->msg);
611 if ((len = bnet_recv(sd)) > 0) {
612 Dmsg1(10, "bfiled<stored: %s", sd->msg);
613 if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) {
614 Emsg1(M_FATAL, 0, _("Bad response to read open: %s\n"), sd->msg);
617 Dmsg1(10, "bfiled: got Ticket=%d\n", jcr->Ticket);
619 Emsg0(M_FATAL, 0, _("Bad response from stored to read open command\n"));
624 * Start read of data with Storage daemon
626 bnet_fsend(sd, read_data, jcr->Ticket);
627 Dmsg1(10, ">stored: %s", sd->msg);
632 if (!response(sd, OK_data, "Read Data")) {
637 * Do restore of files and data
639 do_restore(jcr, ip_addr, data_port);
642 * Send Close session command to Storage daemon
644 bnet_fsend(sd, read_close, jcr->Ticket);
645 Dmsg1(30, "bfiled>stored: %s", sd->msg);
647 /* ****FIXME**** check response */
648 bnet_recv(sd); /* get OK */
650 /* Inform Storage daemon that we are done */
651 bnet_sig(sd, BNET_EOF);
653 /* Inform Director that we are done */
654 bnet_sig(dir, BNET_EOF);
657 free_pool_memory(ip_addr);
658 Dmsg0(30, "Done in job.c\n");
665 * Destroy the Job Control Record and associated
666 * resources (sockets).
668 static void filed_free_jcr(JCR *jcr)
670 if (jcr->store_bsock) {
671 bnet_close(jcr->store_bsock);
674 free_pool_memory(jcr->where);
676 if (jcr->last_fname) {
677 free_pool_memory(jcr->last_fname);
683 * Get response from Storage daemon to a command we
684 * sent. Check that the response is OK.
686 * Returns: 0 on failure
689 int response(BSOCK *sd, char *resp, char *cmd)
696 if ((n = bnet_recv(sd)) > 0) {
698 if (strcmp(sd->msg, resp) == 0) {
702 /* ********FIXME******** segfault if the following is executed */
704 Emsg3(M_FATAL, 0, _("<stored: bad response to %s: wanted: %s, got: %s\n"),
707 Emsg2(M_FATAL, 0, _("<stored: bad response to %s command: ERR=%s\n"),
708 cmd, bnet_strerror(sd));