2 * Bacula File Daemon Job processing
4 * Kern Sibbald, October MM
10 Copyright (C) 2000, 2001, 2002 Kern Sibbald and John Walker
12 This program is free software; you can redistribute it and/or
13 modify it under the terms of the GNU General Public License as
14 published by the Free Software Foundation; either version 2 of
15 the License, or (at your option) any later version.
17 This program is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 General Public License for more details.
22 You should have received a copy of the GNU General Public
23 License along with this program; if not, write to the Free
24 Software Foundation, Inc., 59 Temple Place - Suite 330, Boston,
32 extern char my_name[];
34 /* Imported functions */
35 extern int status_cmd(JCR *jcr);
37 /* Forward referenced functions */
38 static int backup_cmd(JCR *jcr);
39 static int cancel_cmd(JCR *jcr);
40 static int setdebug_cmd(JCR *jcr);
41 static int estimate_cmd(JCR *jcr);
42 static int exclude_cmd(JCR *jcr);
43 static int hello_cmd(JCR *jcr);
44 static int job_cmd(JCR *jcr);
45 static int include_cmd(JCR *jcr);
46 static int level_cmd(JCR *jcr);
47 static int verify_cmd(JCR *jcr);
48 static int restore_cmd(JCR *jcr);
49 static int storage_cmd(JCR *jcr);
50 static int session_cmd(JCR *jcr);
51 static int response(BSOCK *sd, char *resp, char *cmd);
52 static void filed_free_jcr(JCR *jcr);
56 /* Exported functions */
64 * The following are the recognized commands from the Director.
66 static struct s_cmds cmds[] = {
67 {"backup", backup_cmd},
68 {"cancel", cancel_cmd},
69 {"setdebug=", setdebug_cmd},
70 {"estimate", estimate_cmd},
71 {"exclude", exclude_cmd},
73 {"include", include_cmd},
75 {"level = ", level_cmd},
76 {"restore", restore_cmd},
77 {"session", session_cmd},
78 {"status", status_cmd},
79 {"storage ", storage_cmd},
80 {"verify", verify_cmd},
81 {NULL, NULL} /* list terminator */
84 /* Commands received from director that need scanning */
85 static char jobcmd[] = "JobId=%d Job=%127s SDid=%d SDtime=%d Authorization=%100s";
86 static char storaddr[] = "storage address=%s port=%d\n";
87 static char sessioncmd[] = "session %s %ld %ld %ld %ld %ld %ld\n";
88 static char restorecmd[] = "restore where=%s\n";
90 /* Responses sent to Director */
91 static char errmsg[] = "2999 Invalid command\n";
92 static char no_auth[] = "2998 No Authorization\n";
93 static char OKinc[] = "2000 OK include\n";
94 static char OKest[] = "2000 OK estimate files=%ld bytes=%ld\n";
95 static char OKexc[] = "2000 OK exclude\n";
96 static char OKlevel[] = "2000 OK level\n";
97 static char OKbackup[] = "2000 OK backup\n";
98 static char OKverify[] = "2000 OK verify\n";
99 static char OKrestore[] = "2000 OK restore\n";
100 static char OKsession[] = "2000 OK session\n";
101 static char OKstore[] = "2000 OK storage\n";
102 static char OKjob[] = "2000 OK Job\n";
103 static char OKsetdebug[] = "2000 OK setdebug=%d\n";
104 static char BADjob[] = "2901 Bad Job\n";
106 /* Responses received from Storage Daemon */
107 static char OK_end[] = "3000 OK end\n";
108 static char OK_open[] = "3000 OK open ticket = %d\n";
109 static char OK_data[] = "3000 OK data\n";
110 static char OK_append[] = "3000 OK append data\n";
113 /* Commands sent to Storage Daemon */
114 static char append_open[] = "append open session\n";
115 static char append_data[] = "append data %d\n";
116 static char append_end[] = "append end session %d\n";
117 static char append_close[] = "append close session %d\n";
118 static char read_open[] = "read open session = %s %ld %ld %ld %ld %ld %ld\n";
119 static char read_data[] = "read data %d\n";
120 static char read_close[] = "read close session %d\n";
123 * Accept requests from a Director
125 * NOTE! We are running as a separate thread
127 * Send output one line
128 * at a time followed by a zero length transmission.
130 * Return when the connection is terminated or there
133 * Basic task here is:
134 * Authenticate Director (during Hello command).
135 * Accept commands one at a time from the Director
139 void *handle_client_request(void *dirp)
143 BSOCK *dir = (BSOCK *) dirp;
145 jcr = new_jcr(sizeof(JCR), filed_free_jcr); /* create JCR */
146 jcr->dir_bsock = dir;
147 jcr->ff = init_find_files();
148 jcr->start_time = time(NULL);
149 jcr->last_fname = (char *) get_pool_memory(PM_FNAME);
150 jcr->client_name = bstrdup(my_name);
152 /**********FIXME******* add command handler error code */
154 for (quit=0; !quit;) {
157 if (bnet_recv(dir) <= 0) {
158 break; /* connection terminated */
160 dir->msg[dir->msglen] = 0;
161 Dmsg1(9, "<dird: %s", dir->msg);
163 for (i=0; cmds[i].cmd; i++) {
164 if (strncmp(cmds[i].cmd, dir->msg, strlen(cmds[i].cmd)) == 0) {
165 if (!jcr->authenticated && cmds[i].func != hello_cmd) {
166 bnet_fsend(dir, no_auth);
169 if (!cmds[i].func(jcr)) { /* do command */
170 quit = TRUE; /* error, get out */
171 Dmsg0(20, "Command error\n");
173 found = TRUE; /* indicate command found */
177 if (!found) { /* command not found */
178 bnet_fsend(dir, errmsg);
183 Dmsg0(20, "Calling term_find_files\n");
184 term_find_files(jcr->ff);
185 Dmsg0(20, "Done with term_find_files\n");
186 free_jcr(jcr); /* destroy JCR record */
187 Dmsg0(20, "Done with free_jcr\n");
192 * Hello from Director he must identify himself and provide his
195 static int hello_cmd(JCR *jcr)
197 Dmsg0(20, "Calling Authenticate\n");
198 if (!authenticate_director(jcr)) {
201 Dmsg0(20, "OK Authenticate\n");
202 jcr->authenticated = TRUE;
209 static int cancel_cmd(JCR *jcr)
211 BSOCK *dir = jcr->dir_bsock;
212 char Job[MAX_NAME_LENGTH];
215 if (sscanf(dir->msg, "cancel Job=%127s", Job) == 1) {
216 if (!(cjcr=get_jcr_by_full_name(Job))) {
217 bnet_fsend(dir, "2901 Job %s not found.\n", Job);
219 cjcr->JobStatus = JS_Cancelled;
221 bnet_fsend(dir, "2001 Job %s marked to be cancelled.\n", Job);
224 bnet_fsend(dir, "2902 Error scanning cancel command.\n");
226 bnet_sig(dir, BNET_EOF);
232 * Set debug level as requested by the Director
235 static int setdebug_cmd(JCR *jcr)
237 BSOCK *dir = jcr->dir_bsock;
240 Dmsg1(10, "setdebug_cmd: %s", dir->msg);
241 if (sscanf(dir->msg, "setdebug=%d", &level) != 1 || level < 0) {
242 bnet_fsend(dir, "2991 Bad setdebug command: %s\n", dir->msg);
246 return bnet_fsend(dir, OKsetdebug, level);
250 static int estimate_cmd(JCR *jcr)
252 BSOCK *dir = jcr->dir_bsock;
254 return bnet_fsend(dir, OKest, jcr->JobFiles, jcr->JobBytes);
258 * Get JobId and Storage Daemon Authorization key from Director
260 static int job_cmd(JCR *jcr)
262 BSOCK *dir = jcr->dir_bsock;
265 sd_auth_key = (char *) get_memory(dir->msglen);
266 if (sscanf(dir->msg, jobcmd, &jcr->JobId, jcr->Job,
267 &jcr->VolSessionId, &jcr->VolSessionTime,
269 bnet_fsend(dir, BADjob);
270 Emsg1(M_FATAL, 0, _("Bad Job Command: %s\n"), dir->msg);
271 free_pool_memory(sd_auth_key);
274 jcr->sd_auth_key = bstrdup(sd_auth_key);
275 free_pool_memory(sd_auth_key);
276 Dmsg2(20, "JobId=%d Auth=%s\n", jcr->JobId, jcr->sd_auth_key);
277 return bnet_fsend(dir, OKjob);
282 * Get list of files/directories to include from Director
285 static int include_cmd(JCR *jcr)
287 BSOCK *dir = jcr->dir_bsock;
289 while (bnet_recv(dir) > 0) {
290 dir->msg[dir->msglen] = 0;
291 strip_trailing_junk(dir->msg);
292 Dmsg1(10, "filed<dird: include file %s\n", dir->msg);
293 add_fname_to_include_list(jcr->ff, 1, dir->msg);
296 return bnet_fsend(dir, OKinc);
300 * Get list of files to exclude from Director
303 static int exclude_cmd(JCR *jcr)
305 BSOCK *dir = jcr->dir_bsock;
308 while (bnet_recv(dir) > 0) {
309 dir->msg[dir->msglen] = 0;
310 strip_trailing_junk(dir->msg);
311 /* Skip leading options */
312 for (p=dir->msg; *p && *p != ' '; p++)
315 for ( ; *p && *p == ' '; p++)
317 add_fname_to_exclude_list(jcr->ff, p);
318 Dmsg1(10, "<dird: exclude file %s\n", dir->msg);
321 return bnet_fsend(dir, OKexc);
325 * Get backup level from Director
328 static int level_cmd(JCR *jcr)
330 BSOCK *dir = jcr->dir_bsock;
335 level = (char *) get_memory(dir->msglen);
336 Dmsg1(10, "level_cmd: %s", dir->msg);
337 if (sscanf(dir->msg, "level = %s ", level) != 1) {
338 Jmsg1(jcr, M_FATAL, 0, _("Bad level command: %s\n"), dir->msg);
343 * Full backup requested
345 if (strcmp(level, "full") == 0) {
346 jcr->save_level = L_FULL;
348 * Backup requested since <date> <time>
349 * This form is also used for incremental and differential
351 } else if (strcmp(level, "since") == 0) {
352 jcr->save_level = L_SINCE;
353 if (sscanf(dir->msg, "level = since %d-%d-%d %d:%d:%d",
354 &tm.tm_year, &tm.tm_mon, &tm.tm_mday,
355 &tm.tm_hour, &tm.tm_min, &tm.tm_sec) != 6) {
356 Jmsg1(jcr, M_FATAL, 0, "Bad scan of date/time: %s\n", dir->msg);
362 tm.tm_wday = tm.tm_yday = 0;
365 Dmsg1(90, "Got since time: %s", ctime(&mtime));
366 jcr->incremental = 1;
368 } else if (strcmp(level, "catalog") == 0) {
369 /* nothing for now */
370 } else if (strcmp(level, "init") == 0) {
371 /* nothing for now */
373 Jmsg1(jcr, M_FATAL, 0, "Unknown backup level: %s\n", level);
378 return bnet_fsend(dir, OKlevel);
382 * Get session parameters from Director -- this is for a Restore command
384 static int session_cmd(JCR *jcr)
386 BSOCK *dir = jcr->dir_bsock;
388 if (sscanf(dir->msg, sessioncmd, jcr->VolumeName,
389 &jcr->VolSessionId, &jcr->VolSessionTime,
390 &jcr->StartFile, &jcr->EndFile,
391 &jcr->StartBlock, &jcr->EndBlock) != 7) {
392 Emsg1(M_FATAL, 0, "Bad session command: %s", dir->msg);
396 return bnet_fsend(dir, OKsession);
400 * Get address of storage daemon from Director
403 static int storage_cmd(JCR *jcr)
405 int stored_port; /* storage daemon port */
406 BSOCK *dir = jcr->dir_bsock;
407 BSOCK *sd; /* storage daemon bsock */
409 if (sscanf(dir->msg, storaddr, &jcr->stored_addr, &stored_port) != 2) {
410 Emsg1(M_FATAL, 0, _("Bad storage command: %s\n"), dir->msg);
413 Dmsg2(30, "Got storage: %s:%d\n", jcr->stored_addr, stored_port);
414 /* Open command communications with Storage daemon */
415 /* Try to connect for 1 hour at 10 second intervals */
416 sd = bnet_connect(jcr, 10, 3600, _("Storage daemon"),
417 jcr->stored_addr, NULL, stored_port, 1);
419 Jmsg2(jcr, M_FATAL, 0, _("Failed to connect to Storage daemon: %s:%d\n"),
420 jcr->stored_addr, stored_port);
424 jcr->store_bsock = sd;
426 bnet_fsend(sd, "Hello Start Job %s\n", jcr->Job);
427 if (!authenticate_storagedaemon(jcr)) {
428 Jmsg(jcr, M_FATAL, 0, _("Failed to authenticate Storage daemon.\n"));
432 /* Send OK to Director */
433 return bnet_fsend(dir, OKstore);
438 * Do a backup. For now, we handle only Full and Incremental.
440 static int backup_cmd(JCR *jcr)
443 BSOCK *dir = jcr->dir_bsock;
444 BSOCK *sd = jcr->store_bsock;
447 jcr->JobStatus = JS_Blocked;
448 jcr->JobType = JT_BACKUP;
449 Dmsg1(100, "begin backup ff=%p\n", jcr->ff);
452 Emsg0(M_FATAL, 0, _("Cannot contact Storage daemon\n"));
453 jcr->JobStatus = JS_ErrorTerminated;
457 bnet_fsend(dir, OKbackup);
458 Dmsg1(10, "bfiled>dird: %s", dir->msg);
461 * Send Append Open Session to Storage daemon
463 bnet_fsend(sd, append_open);
464 Dmsg1(10, ">stored: %s", sd->msg);
466 * Expect to receive back the Ticket number
468 if (bnet_recv(sd) > 0) {
469 Dmsg1(10, "<stored: %s", sd->msg);
470 if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) {
471 Emsg1(M_FATAL, 0, _("Bad response to append open: %s\n"), sd->msg);
472 jcr->JobStatus = JS_ErrorTerminated;
475 Dmsg1(10, "Got Ticket=%d\n", jcr->Ticket);
477 Emsg0(M_FATAL, 0, _("Bad response from stored to open command\n"));
478 jcr->JobStatus = JS_ErrorTerminated;
483 * Send Append data command to Storage daemon
485 bnet_fsend(sd, append_data, jcr->Ticket);
486 Dmsg1(10, ">stored: %s", sd->msg);
489 * Expect to get OK data
491 Dmsg1(10, "<stored: %s", sd->msg);
492 if (!response(sd, OK_data, "Append Data")) {
493 jcr->JobStatus = JS_ErrorTerminated;
498 * Send Files to Storage daemon
500 Dmsg1(100, "begin blast ff=%p\n", jcr->ff);
501 if (!blast_data_to_storage_daemon(jcr, NULL, data_port)) {
502 jcr->JobStatus = JS_ErrorTerminated;
504 jcr->JobStatus = JS_Terminated;
506 * Expect to get response to append_data from Storage daemon
508 if (!response(sd, OK_append, "Append Data")) {
509 jcr->JobStatus = JS_ErrorTerminated;
514 * Send Append End Data to Storage daemon
516 bnet_fsend(sd, append_end, jcr->Ticket);
518 if (!response(sd, OK_end, "Append End")) {
519 jcr->JobStatus = JS_ErrorTerminated;
524 * Send Append Close to Storage daemon
526 bnet_fsend(sd, append_close, jcr->Ticket);
527 while ((len = bnet_recv(sd)) > 0) {
528 /* discard anything else returned from SD */
531 Emsg2(M_FATAL, 0, _("<stored: net_recv len=%d: ERR=%s\n"), len, bnet_strerror(sd));
532 jcr->JobStatus = JS_ErrorTerminated;
538 /* Inform Storage daemon that we are done */
540 bnet_sig(sd, BNET_EOF);
543 /* Inform Director that we are done */
544 bnet_sig(dir, BNET_EOF);
546 return jcr->JobStatus == JS_Terminated;
550 * Do a Verify for Director
553 static int verify_cmd(JCR *jcr)
555 BSOCK *dir = jcr->dir_bsock;
557 jcr->JobType = JT_VERIFY;
558 bnet_fsend(dir, OKverify);
559 Dmsg1(10, "bfiled>dird: %s", dir->msg);
563 /* Inform Director that we are done */
564 return bnet_sig(dir, BNET_EOF);
568 * Do a Restore for Director
571 static int restore_cmd(JCR *jcr)
576 BSOCK *dir = jcr->dir_bsock;
577 BSOCK *sd = jcr->store_bsock;
581 * Scan WHERE (base directory for restore) from command
583 Dmsg0(50, "restore command\n");
584 /* Pickup where string */
585 where = (char *) get_memory(dir->msglen+1);
587 sscanf(dir->msg, restorecmd, where);
588 Dmsg1(50, "Got where=%s\n", where);
591 bnet_fsend(dir, OKrestore);
592 Dmsg1(10, "bfiled>dird: %s", dir->msg);
594 jcr->JobType = JT_RESTORE;
595 jcr->JobStatus = JS_Blocked;
596 ip_addr = (char *) get_pool_memory(PM_FNAME);
598 Dmsg4(20, "VolSessId=%ld VolsessT=%ld SF=%ld EF=%ld\n",
599 jcr->VolSessionId, jcr->VolSessionTime, jcr->StartFile, jcr->EndFile);
600 Dmsg2(20, "JobId=%d vol=%s\n", jcr->JobId, "DummyVolume");
603 * Open Read Session with Storage daemon
605 bnet_fsend(sd, read_open, jcr->VolumeName,
606 jcr->VolSessionId, jcr->VolSessionTime, jcr->StartFile, jcr->EndFile,
607 jcr->StartBlock, jcr->EndBlock);
608 Dmsg1(10, ">stored: %s", sd->msg);
613 if ((len = bnet_recv(sd)) > 0) {
614 Dmsg1(10, "bfiled<stored: %s", sd->msg);
615 if (sscanf(sd->msg, OK_open, &jcr->Ticket) != 1) {
616 Emsg1(M_FATAL, 0, _("Bad response to read open: %s\n"), sd->msg);
619 Dmsg1(10, "bfiled: got Ticket=%d\n", jcr->Ticket);
621 Emsg0(M_FATAL, 0, _("Bad response from stored to read open command\n"));
626 * Start read of data with Storage daemon
628 bnet_fsend(sd, read_data, jcr->Ticket);
629 Dmsg1(10, ">stored: %s", sd->msg);
634 if (!response(sd, OK_data, "Read Data")) {
639 * Do restore of files and data
641 do_restore(jcr, ip_addr, data_port);
644 * Send Close session command to Storage daemon
646 bnet_fsend(sd, read_close, jcr->Ticket);
647 Dmsg1(30, "bfiled>stored: %s", sd->msg);
649 /* ****FIXME**** check response */
650 bnet_recv(sd); /* get OK */
652 /* Inform Storage daemon that we are done */
653 bnet_sig(sd, BNET_EOF);
655 /* Inform Director that we are done */
656 bnet_sig(dir, BNET_EOF);
659 free_pool_memory(ip_addr);
660 Dmsg0(30, "Done in job.c\n");
667 * Destroy the Job Control Record and associated
668 * resources (sockets).
670 static void filed_free_jcr(JCR *jcr)
672 if (jcr->store_bsock) {
673 bnet_close(jcr->store_bsock);
676 free_pool_memory(jcr->where);
678 if (jcr->last_fname) {
679 free_pool_memory(jcr->last_fname);
685 * Get response from Storage daemon to a command we
686 * sent. Check that the response is OK.
688 * Returns: 0 on failure
691 int response(BSOCK *sd, char *resp, char *cmd)
698 if ((n = bnet_recv(sd)) > 0) {
700 if (strcmp(sd->msg, resp) == 0) {
704 /* ********FIXME******** segfault if the following is executed */
706 Emsg3(M_FATAL, 0, _("<stored: bad response to %s: wanted: %s, got: %s\n"),
709 Emsg2(M_FATAL, 0, _("<stored: bad response to %s command: ERR=%s\n"),
710 cmd, bnet_strerror(sd));