Public License, v3.0 ("AGPLv3") and some additional permissions and
terms pursuant to its AGPLv3 Section 7.
- This notice must be preserved when any source code is
+ This notice must be preserved when any source code is
conveyed and/or propagated.
Bacula(R) is a registered trademark of Kern Sibbald.
* Bacula File Daemon Job processing
*
* Written by Kern Sibbald, October MM
- *
*/
#include "bacula.h"
/* Static variables */
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-const bool have_win32 = false;
#ifdef HAVE_ACL
const bool have_acl = true;
#else
const bool have_acl = false;
#endif
-
+
#if HAVE_XATTR
const bool have_xattr = true;
#else
static int set_options(findFOPTS *fo, const char *opts);
static void set_storage_auth_key(JCR *jcr, char *key);
static int sm_dump_cmd(JCR *jcr);
+static int proxy_cmd(JCR *jcr);
+static int fd_testnetwork_cmd(JCR *jcr);
#ifdef DEVELOPER
static int exit_cmd(JCR *jcr);
#endif
/* Exported functions */
+#define ACCESS_MONITOR 1
+#define ACCESS_REMOTE 2
/*
* The following are the recognized commands from the Director.
*/
struct s_cmds cmds[] = {
{"backup", backup_cmd, 0},
- {"cancel", cancel_cmd, 0},
+ {"cancel", cancel_cmd, ACCESS_REMOTE},
{"setdebug=", setdebug_cmd, 0},
- {"setbandwidth=",setbandwidth_cmd, 0},
+ {"setbandwidth=",setbandwidth_cmd, ACCESS_REMOTE},
{"snapshot", snapshot_cmd, 0},
{"estimate", estimate_cmd, 0},
{"Hello", hello_cmd, 1},
{"restore ", restore_cmd, 0},
{"endrestore", end_restore_cmd, 0},
{"session", session_cmd, 0},
- {"status", status_cmd, 1},
- {".status", qstatus_cmd, 1},
+ {"status", status_cmd, ACCESS_MONITOR|ACCESS_REMOTE},
+ {".status", qstatus_cmd, ACCESS_MONITOR|ACCESS_REMOTE},
{"storage ", storage_cmd, 0},
{"verify", verify_cmd, 0},
{"component", component_cmd, 0},
{"accurate", accurate_cmd, 0},
{"restoreobject", restore_object_cmd, 0},
{"sm_dump", sm_dump_cmd, 0},
- {"stop", cancel_cmd, 0},
+ {"stop", cancel_cmd, ACCESS_REMOTE},
+ {"proxy", proxy_cmd, ACCESS_REMOTE},
+ {"testnetwork", fd_testnetwork_cmd, 0},
#ifdef DEVELOPER
{"exit", exit_cmd, 0},
#endif
static char BADjob[] = "2901 Bad Job\n";
static char EndJob[] = "2800 End Job TermCode=%d JobFiles=%d ReadBytes=%lld"
" JobBytes=%lld Errors=%d VSS=%d Encrypt=%d"
- " CommBytes=0 CompressCommBytes=0\n";
+ " CommBytes=%lld CompressCommBytes=%lld\n";
static char OKRunBefore[] = "2000 OK RunBefore\n";
static char OKRunBeforeNow[] = "2000 OK RunBeforeNow\n";
static char OKRunAfter[] = "2000 OK RunAfter\n";
static char read_close[] = "read close session %d\n";
static char read_ctrl[] = "read control %d\n";
+/* Should tell us if a command is authorized or not */
+static bool access_ok(struct s_cmds *cmd, DIRRES* dir)
+{
+ if ((cmd->access & ACCESS_MONITOR) && dir->monitor) {
+ return true;
+ }
+ if ((cmd->access & ACCESS_REMOTE) && dir->remote) {
+ return true;
+ }
+ if (!dir->remote && !dir->monitor) {
+ return true;
+ }
+ return false;
+}
+
/*
* Accept requests from a Director
*
dir->signal(BNET_EOD);
break;
}
- if ((jcr->authenticated) && (!cmds[i].monitoraccess) && (jcr->director->monitor)) {
+ if (jcr->authenticated && !access_ok(&cmds[i], jcr->director)) {
Dmsg1(100, "Command \"%s\" is invalid.\n", cmds[i].cmd);
dir->fsend(invalid_cmd);
dir->signal(BNET_EOD);
/* Run the after job */
run_scripts(jcr, jcr->RunScripts, "ClientAfterJob");
+ /* send any queued messages before reporting the jobstatus to the director */
+ dequeue_messages(jcr);
+
if (jcr->JobId) { /* send EndJob if running a job */
+ uint64_t CommBytes, CommCompressedBytes;
uint32_t vss, encrypt;
/* Send termination status back to Dir */
+ if (jcr->store_bsock) {
+ CommBytes = jcr->store_bsock->CommBytes();
+ CommCompressedBytes = jcr->store_bsock->CommCompressedBytes();
+ } else {
+ CommBytes = CommCompressedBytes = 0;
+ }
encrypt = jcr->crypto.pki_encrypt;
vss = jcr->Snapshot;
dir->fsend(EndJob, jcr->JobStatus, jcr->JobFiles,
jcr->ReadBytes, jcr->JobBytes, jcr->JobErrors, vss,
- encrypt, 0, 0);
+ encrypt, CommBytes, CommCompressedBytes);
+ //Dmsg0(0, dir->msg);
}
generate_daemon_event(jcr, "JobEnd");
generate_plugin_event(jcr, bEventJobEnd);
bail_out:
- dequeue_messages(jcr); /* send any queued messages */
+ dequeue_messages(jcr); /* send any queued messages, will no longer impact
+ * the job status... */
/* Inform Director that we are done */
dir->signal(BNET_TERMINATE);
if (bs->msglen < 25 || bs->msglen > 500) {
goto bail_out;
}
- Dmsg1(100, "Got: %s", bs->msg);
- if (strncmp(bs->msg, "Hello Director", 14) == 0) {
- return handle_director_request(bs);
- }
if (strncmp(bs->msg, "Hello FD: Bacula Storage", 20) ==0) {
return handle_storage_connection(bs);
}
+ if (strncmp(bs->msg, "Hello ", 5) == 0) {
+ return handle_director_request(bs);
+ }
}
bail_out:
Dmsg2(100, "Bad command from %s. Len=%d.\n", bs->who(), bs->msglen);
return NULL;
}
+
+/*
+ * Test the Network between FD/SD
+ */
+static int fd_testnetwork_cmd(JCR *jcr)
+{
+ bool can_compress, ok=true;
+ BSOCK *sd = jcr->store_bsock;
+ int64_t nb=0, nb2=0;
+ char ed1[50];
+ btime_t start, end;
+
+ if (!sd || !jcr->dir_bsock) {
+ return 1;
+ }
+ if (sscanf(jcr->dir_bsock->msg, "testnetwork bytes=%lld", &nb) != 1 || nb <= 0) {
+ sd->fsend("2999 testnetwork command error\n");
+ return 1;
+ }
+
+ /* We disable the comline compression, else all numbers will be wrong */
+ can_compress = sd->can_compress();
+
+ sd->fsend("testnetwork bytes=%lld\n", nb);
+ sd->clear_compress();
+
+ /* In the first step, we send X bytes to the SD */
+ memset(sd->msg, 0xAA, sizeof_pool_memory(sd->msg));
+ sd->msglen = sizeof_pool_memory(sd->msg);
+
+ start = get_current_btime();
+ for (nb2 = nb ; nb2 > 0 && ok ; nb2 -= sd->msglen) {
+ if (nb2 < sd->msglen) {
+ sd->msglen = nb2;
+ }
+ ok = sd->send();
+ }
+ sd->signal(BNET_EOD);
+ end = get_current_btime() + 1;
+
+ if (!ok) {
+ goto bail_out;
+ }
+
+ jcr->dir_bsock->fsend("2000 OK bytes=%lld duration=%lldms write_speed=%sB/s\n",
+ nb, end/1000 - start/1000,
+ edit_uint64_with_suffix(nb * 1000000 / (end - start), ed1));
+
+ /* Now we receive X bytes from the SD */
+ start = get_current_btime();
+ for (nb2 = 0; sd->recv() > 0; nb2 += sd->msglen) { }
+ end = get_current_btime() + 1;
+
+ jcr->dir_bsock->fsend("2000 OK bytes=%lld duration=%lldms read_speed=%sB/s\n",
+ nb2, end/1000 - start/1000,
+ edit_uint64_with_suffix(nb2 * 1000000 / (end - start), ed1));
+
+ jcr->dir_bsock->signal(BNET_CMD_OK);
+
+bail_out:
+ if (can_compress) {
+ sd->set_compress();
+ }
+ if (!ok) {
+ jcr->dir_bsock->fsend("2999 network test failed ERR=%s\n", sd->errmsg);
+ jcr->dir_bsock->signal(BNET_CMD_FAILED);
+ }
+
+ return 1;
+}
+
+static int proxy_cmd(JCR *jcr)
+{
+ bool OK=true, fdcalled = false;
+ BSOCK *cons_bsock;
+ CONSRES *cons = jcr->director->console;
+ int v, maxfd;
+ fd_set fdset;
+ struct timeval tv;
+
+ if (!cons) {
+ cons = (CONSRES *)GetNextRes(R_CONSOLE, NULL);
+ }
+ /* Here, dir_bsock is not really the director, this is a console */
+ cons_bsock = connect_director(jcr, cons);
+ if (!cons_bsock) {
+ jcr->dir_bsock->signal(BNET_ERROR_MSG);
+ jcr->dir_bsock->fsend("2999 proxy error. ERR=%s\n", jcr->errmsg);
+ jcr->dir_bsock->signal(BNET_MAIN_PROMPT);
+ /* Error during the connect */
+ return 1;
+ }
+
+ /* Inform the console that the command is OK */
+ jcr->dir_bsock->fsend("2000 proxy OK.\n");
+ jcr->dir_bsock->signal(BNET_MAIN_PROMPT);
+
+ maxfd = MAX(cons_bsock->m_fd, jcr->dir_bsock->m_fd) + 1;
+
+ /* Start to forward events from one to the other
+ * It can be done with 2 threads, or with a select
+ */
+ do {
+ FD_ZERO(&fdset);
+ FD_SET((unsigned)cons_bsock->m_fd, &fdset);
+ FD_SET((unsigned)jcr->dir_bsock->m_fd, &fdset);
+
+ tv.tv_sec = 5;
+ tv.tv_usec = 0;
+ switch ((v = select(maxfd, &fdset, NULL, NULL, &tv))) {
+ case 0: /* timeout */
+ OK = !jcr->is_canceled();
+ break;
+ case -1:
+ Dmsg1(0, "Bad call to select ERR=%d\n", errno);
+ OK = false;
+ default:
+#ifdef HAVE_TLS
+ if (cons_bsock->tls && !tls_bsock_probe(cons_bsock)) {
+ /* maybe a session key negotiation waked up the socket */
+ FD_CLR(cons_bsock->m_fd, &fdset);
+ }
+ if (jcr->dir_bsock->tls && !tls_bsock_probe(jcr->dir_bsock)) {
+ /* maybe a session key negotiation waked up the socket */
+ FD_CLR(jcr->dir_bsock->m_fd, &fdset);
+ }
+#endif
+ break;
+ }
+ Dmsg1(DT_NETWORK, "select = %d\n", v);
+ if (OK) {
+ if (FD_ISSET(cons_bsock->m_fd, &fdset)) {
+ v = cons_bsock->recv();
+ if (v == BNET_SIGNAL) {
+ if (cons_bsock->msglen == BNET_FDCALLED) {
+ OK = false;
+ fdcalled = true;
+ } else {
+ jcr->dir_bsock->signal(cons_bsock->msglen);
+ }
+
+ } else if (v >= 0) {
+ jcr->dir_bsock->fsend("%s", cons_bsock->msg);
+
+ } else {
+ /* We should not have such kind of message */
+ OK = false;
+ }
+ }
+ if (FD_ISSET(jcr->dir_bsock->m_fd, &fdset)) {
+ v = jcr->dir_bsock->recv();
+ if (v == BNET_SIGNAL) {
+ cons_bsock->signal(jcr->dir_bsock->msglen);
+ } else if (v >= 0) {
+ cons_bsock->fsend("%s", jcr->dir_bsock->msg);
+ } else {
+ /* We should not have such kind of message */
+ OK = false;
+ }
+ }
+ }
+ if (cons_bsock->is_error() || jcr->dir_bsock->is_error()) {
+ OK = false;
+ }
+ } while (OK && !jcr->is_canceled());
+
+ /* Close the socket, nothing more will come */
+ jcr->dir_bsock->signal(BNET_TERMINATE);
+ jcr->dir_bsock->close();
+ if (fdcalled) {
+ handle_connection_request(cons_bsock); /* will release the socket */
+ } else {
+ free_bsock(cons_bsock);
+ }
+ return 1;
+}
+
static int sm_dump_cmd(JCR *jcr)
{
close_memory_pool();
}
#endif
-
-/**
+/*
* Hello from Director he must identify himself and provide his
* password.
*/
Dmsg0(120, "OK Authenticate\n");
jcr->authenticated = true;
+ dequeue_messages(jcr); /* dequeue any daemon messages */
return 1;
}
-/**
+/*
* Cancel a Job
*/
static int cancel_cmd(JCR *jcr)
generate_plugin_event(cjcr, bEventCancelCommand, NULL);
cjcr->setJobStatus(status);
if (cjcr->store_bsock) {
- cjcr->store_bsock->set_timed_out();
- cjcr->store_bsock->set_terminated();
+ cjcr->store_bsock->cancel();
}
cjcr->my_thread_send_signal(TIMEOUT_SIGNAL);
free_jcr(cjcr);
}
debug_level_tags = level_tags;
+ /* Parse specific FD options */
+ for (char *p = options; *p ; p++) {
+ switch(*p) {
+ case 'i':
+ /* Turn on/off ignore bwrite() errors on restore */
+ no_win32_write_errors = true;
+ break;
+ case 'd':
+ /* Turn on/off decomp of BackupRead() streams */
+ win32decomp = true;
+ break;
+ }
+ }
+
/* handle other options */
set_debug_flags(options);
return dir->fsend(OKjob, VERSION, LSMDATE, HOST_OS, DISTNAME, DISTVER);
}
-extern "C" char *job_code_callback_filed(JCR *jcr, const char* param)
+extern "C" char *job_code_callback_filed(JCR *jcr, const char* param, char *buf, int buflen)
{
switch (param[0]) {
- case 'D':
- if (jcr->director) {
- return jcr->director->hdr.name;
- }
- break;
+ case 'D':
+ if (jcr->director) {
+ return jcr->director->hdr.name;
+ }
+ break;
+ case 'S':
+ return jcr->PrevJob;
}
return NULL;
return true;
}
-
static void append_file(JCR *jcr, findINCEXE *incexe,
const char *buf, bool is_file)
{
if (is_file) {
incexe->name_list.append(new_dlistString(buf));
-
} else if (me->plugin_directory) {
generate_plugin_event(jcr, bEventPluginCommand, (void *)buf);
incexe->plugin_list.append(new_dlistString(buf));
-
} else {
Jmsg(jcr, M_FATAL, 0,
_("Plugin Directory not defined. Cannot use plugin: \"%s\"\n"),
}
jcr->sd_auth_key = bstrdup(key);
- Dmsg1(5, "set sd auth key %s\n", jcr->sd_auth_key);
+ Dmsg1(200, "set sd auth key %s\n", jcr->sd_auth_key);
}
/**
}
-/**
+/*
* Do a backup.
*/
static int backup_cmd(JCR *jcr)
* If explicitly requesting FO_ACL or FO_XATTR, fail job if it
* is not available on Client machine
*/
- if (jcr->ff->flags & FO_ACL && !(have_acl||have_win32)) {
+ if (jcr->ff->flags & FO_ACL && !(have_acl)) {
Jmsg(jcr, M_FATAL, 0, _("ACL support not configured for Client.\n"));
goto cleanup;
}
dir->fsend(OKbackup);
Dmsg1(110, "filed>dird: %s", dir->msg);
- /**
+ /*
* Send Append Open Session to Storage daemon
*/
sd->fsend(append_open);
/* Call RunScript just after the Snapshot creation, usually, we restart services */
run_scripts(jcr, jcr->RunScripts, "ClientAfterVSS");
-
- /**
+ /*
* Send Files to Storage daemon
*/
Dmsg1(110, "begin blast ff=%p\n", (FF_PKT *)jcr->ff);
* Send Append Close to Storage daemon
*/
sd->fsend(append_close, jcr->Ticket);
- sd->msg[0] = 0;
while (bget_msg(sd) >= 0) { /* stop on signal or error */
if (sscanf(sd->msg, OK_close, &SDJobStatus) == 1) {
ok = 1;
Dmsg2(200, "SDJobStatus = %d %c\n", SDJobStatus, (char)SDJobStatus);
- } else {
- Dmsg1(100, "append_close: scan fail from %s\n", sd->msg);
}
}
if (!ok) {
Jmsg(jcr, M_FATAL, 0, _("Append Close with SD failed.\n"));
- Dmsg1(100, "append_close: scan fail from %s\n", sd->msg);
goto cleanup;
}
if (!(SDJobStatus == JS_Terminated || SDJobStatus == JS_Warnings ||
}
cleanup:
-
generate_plugin_event(jcr, bEventEndBackupJob);
return 0; /* return and stop command loop */
}
generate_daemon_event(jcr, "JobStart");
generate_plugin_event(jcr, bEventStartRestoreJob);
- do_restore(jcr);
+ if (!jcr->is_canceled()) {
+ do_restore(jcr);
+ }
+
stop_dir_heartbeat(jcr);
jcr->setJobStatus(JS_Terminated);
delete jcr->RunScripts;
free_path_list(jcr);
- if (jcr->JobId != 0)
+ if (jcr->JobId != 0) {
write_state_file(me->working_directory, "bacula-fd", get_first_port_host_order(me->FDaddrs));
-
+ }
return;
}