From 2a7d35bf50de478c79c6b2c0341f6ee8ed6c628f Mon Sep 17 00:00:00 2001 From: Eric Bollengier Date: Thu, 25 Nov 2010 12:41:39 +0100 Subject: [PATCH] Add setbandwidth command in Director (FD protocol is now 4) --- bacula/src/dird/backup.c | 11 ++ bacula/src/dird/fd_cmds.c | 15 +++ bacula/src/dird/protos.h | 2 + bacula/src/dird/ua_cmds.c | 185 ++++++++++++-------------------- bacula/src/dird/ua_select.c | 126 ++++++++++++++++++++++ bacula/src/filed/authenticate.c | 5 +- bacula/src/filed/job.c | 52 ++++++++- bacula/src/jcr.h | 1 + 8 files changed, 275 insertions(+), 122 deletions(-) diff --git a/bacula/src/dird/backup.c b/bacula/src/dird/backup.c index 668d5c55ca..43dd1a839a 100644 --- a/bacula/src/dird/backup.c +++ b/bacula/src/dird/backup.c @@ -399,6 +399,17 @@ bool do_backup(JCR *jcr) goto bail_out; } + /* TODO: See priority with bandwidth parameter */ + if (jcr->job->max_bandwidth > 0) { + jcr->max_bandwidth = jcr->job->max_bandwidth; + } else if (jcr->client->max_bandwidth > 0) { + jcr->max_bandwidth = jcr->client->max_bandwidth; + } + + if (jcr->max_bandwidth > 0) { + send_bwlimit(jcr, jcr->Job); /* Old clients don't have this command */ + } + /* * send Storage daemon address to the File daemon */ diff --git a/bacula/src/dird/fd_cmds.c b/bacula/src/dird/fd_cmds.c index 64f12ef985..de84ac934e 100644 --- a/bacula/src/dird/fd_cmds.c +++ b/bacula/src/dird/fd_cmds.c @@ -52,6 +52,7 @@ static char jobcmd[] = "JobId=%s Job=%s SDid=%u SDtime=%u Authorization=%s\ static char levelcmd[] = "level = %s%s%s mtime_only=%d\n"; static char runscript[] = "Run OnSuccess=%u OnFailure=%u AbortOnError=%u When=%u Command=%s\n"; static char runbeforenow[]= "RunBeforeNow\n"; +static char bandwidthcmd[] = "setbandwidth=%lld Job=%s\n"; /* Responses received from File daemon */ static char OKinc[] = "2000 OK include\n"; @@ -60,6 +61,7 @@ static char OKlevel[] = "2000 OK level\n"; static char OKRunScript[] = "2000 OK RunScript\n"; static char OKRunBeforeNow[] = "2000 OK RunBeforeNow\n"; static char OKRestoreObject[] = "2000 OK ObjectRestored\n"; +static char OKBandwidth[] = "2000 OK Bandwidth\n"; /* Forward referenced functions */ static bool send_list_item(JCR *jcr, const char *code, char *item, BSOCK *fd); @@ -287,6 +289,19 @@ static void send_since_time(JCR *jcr) } } +bool send_bwlimit(JCR *jcr, const char *Job) +{ + BSOCK *fd = jcr->file_bsock; + if (jcr->FDVersion >= 4) { + fd->fsend(bandwidthcmd, jcr->max_bandwidth, Job); + if (!response(jcr, fd, OKBandwidth, "Bandwidth", DISPLAY_ERROR)) { + jcr->max_bandwidth = 0; /* can't set bandwidth limit */ + return false; + } + } + return true; +} + /* * Send level command to FD. * Used for backup jobs and estimate command. diff --git a/bacula/src/dird/protos.h b/bacula/src/dird/protos.h index ae358fd92a..f0ae1822a9 100644 --- a/bacula/src/dird/protos.h +++ b/bacula/src/dird/protos.h @@ -96,6 +96,7 @@ extern int connect_to_file_daemon(JCR *jcr, int retry_interval, extern bool send_include_list(JCR *jcr); extern bool send_exclude_list(JCR *jcr); extern bool send_level_command(JCR *jcr); +extern bool send_bwlimit(JCR *jcr, const char *Job); extern int get_attributes_and_put_in_catalog(JCR *jcr); extern void get_attributes_and_compare_to_catalog(JCR *jcr, JobId_t JobId); extern int put_file_into_catalog(JCR *jcr, long file_index, char *fname, @@ -267,6 +268,7 @@ bool get_pool_dbr(UAContext *ua, POOL_DBR *pr, const char *argk="pool"); bool get_client_dbr(UAContext *ua, CLIENT_DBR *cr); POOL *get_pool_resource(UAContext *ua); POOL *select_pool_resource(UAContext *ua); +JCR *select_running_job(UAContext *ua, const char *reason); CLIENT *get_client_resource(UAContext *ua); int get_job_dbr(UAContext *ua, JOB_DBR *jr); diff --git a/bacula/src/dird/ua_cmds.c b/bacula/src/dird/ua_cmds.c index 16c9e8200c..45df21e6d6 100644 --- a/bacula/src/dird/ua_cmds.c +++ b/bacula/src/dird/ua_cmds.c @@ -90,6 +90,7 @@ static int python_cmd(UAContext *ua, const char *cmd); static int release_cmd(UAContext *ua, const char *cmd); static int reload_cmd(UAContext *ua, const char *cmd); static int setdebug_cmd(UAContext *ua, const char *cmd); +static int setbwlimit_cmd(UAContext *ua, const char *cmd); static int setip_cmd(UAContext *ua, const char *cmd); static int time_cmd(UAContext *ua, const char *cmd); static int trace_cmd(UAContext *ua, const char *cmd); @@ -134,7 +135,7 @@ static struct cmdstruct commands[] = { /* C { NT_("help"), help_cmd, _("Print help on specific command"), NT_("add autodisplay automount cancel create delete disable\n\tenable estimate exit gui label list llist" "\n\tmessages memory mount prune purge python quit query\n\trestore relabel release reload run status" - "\n\tsetdebug setip show sqlquery time trace unmount umount\n\tupdate use var version wait"), false}, + "\n\tsetbandwidth setdebug setip show sqlquery time trace unmount\n\tumount update use var version wait"), false}, { NT_("label"), label_cmd, _("Label a tape"), NT_("storage= volume= pool="), false}, { NT_("list"), list_cmd, _("List objects from catalog"), @@ -174,6 +175,9 @@ static struct cmdstruct commands[] = { /* C { NT_("setdebug"), setdebug_cmd, _("Sets debug level"), NT_("level= trace=0/1 client= | dir | storage= | all"), true}, + { NT_("setbandwidth"), setbwlimit_cmd, _("Sets bandwidth"), + NT_("limit= client= jobid= job= ujobid="), true}, + { NT_("setip"), setip_cmd, _("Sets new client address -- if authorized"), NT_(""), false}, { NT_("show"), show_cmd, _("Show resource records"), NT_("job= | pool= | fileset= schedule= | client= | disabled | all"), true}, @@ -444,126 +448,16 @@ int automount_cmd(UAContext *ua, const char *cmd) return 1; } - /* * Cancel a job */ static int cancel_cmd(UAContext *ua, const char *cmd) { - int i, ret; - int njobs = 0; - JCR *jcr = NULL; - char JobName[MAX_NAME_LENGTH]; - - for (i=1; iargc; i++) { - if (strcasecmp(ua->argk[i], NT_("jobid")) == 0) { - uint32_t JobId; - JobId = str_to_int64(ua->argv[i]); - if (!JobId) { - break; - } - if (!(jcr=get_jcr_by_id(JobId))) { - ua->error_msg(_("JobId %s is not running. Use Job name to cancel inactive jobs.\n"), ua->argv[i]); - return 1; - } - break; - } else if (strcasecmp(ua->argk[i], NT_("job")) == 0) { - if (!ua->argv[i]) { - break; - } - if (!(jcr=get_jcr_by_partial_name(ua->argv[i]))) { - ua->warning_msg(_("Warning Job %s is not running. Continuing anyway ...\n"), ua->argv[i]); - jcr = new_jcr(sizeof(JCR), dird_free_jcr); - bstrncpy(jcr->Job, ua->argv[i], sizeof(jcr->Job)); - } - break; - } else if (strcasecmp(ua->argk[i], NT_("ujobid")) == 0) { - if (!ua->argv[i]) { - break; - } - if (!(jcr=get_jcr_by_full_name(ua->argv[i]))) { - ua->warning_msg(_("Warning Job %s is not running. Continuing anyway ...\n"), ua->argv[i]); - jcr = new_jcr(sizeof(JCR), dird_free_jcr); - bstrncpy(jcr->Job, ua->argv[i], sizeof(jcr->Job)); - } - break; - } - - } - if (jcr) { - if (jcr->job && !acl_access_ok(ua, Job_ACL, jcr->job->name())) { - ua->error_msg(_("Unauthorized command from this console.\n")); - return 1; - } - } else { - /* - * If we still do not have a jcr, - * throw up a list and ask the user to select one. - */ - char buf[1000]; - int tjobs = 0; /* total # number jobs */ - /* Count Jobs running */ - foreach_jcr(jcr) { - if (jcr->JobId == 0) { /* this is us */ - continue; - } - tjobs++; /* count of all jobs */ - if (!acl_access_ok(ua, Job_ACL, jcr->job->name())) { - continue; /* skip not authorized */ - } - njobs++; /* count of authorized jobs */ - } - endeach_jcr(jcr); - - if (njobs == 0) { /* no authorized */ - if (tjobs == 0) { - ua->send_msg(_("No Jobs running.\n")); - } else { - ua->send_msg(_("None of your jobs are running.\n")); - } - return 1; - } - - start_prompt(ua, _("Select Job:\n")); - foreach_jcr(jcr) { - char ed1[50]; - if (jcr->JobId == 0) { /* this is us */ - continue; - } - if (!acl_access_ok(ua, Job_ACL, jcr->job->name())) { - continue; /* skip not authorized */ - } - bsnprintf(buf, sizeof(buf), _("JobId=%s Job=%s"), edit_int64(jcr->JobId, ed1), jcr->Job); - add_prompt(ua, buf); - } - endeach_jcr(jcr); - - if (do_prompt(ua, _("Job"), _("Choose Job to cancel"), buf, sizeof(buf)) < 0) { - return 1; - } - if (ua->api && njobs == 1) { - char nbuf[1000]; - bsnprintf(nbuf, sizeof(nbuf), _("Cancel: %s\n\n%s"), buf, - _("Confirm cancel?")); - if (!get_yesno(ua, nbuf) || ua->pint32_val == 0) { - return 1; - } - } else { - if (njobs == 1) { - if (!get_yesno(ua, _("Confirm cancel (yes/no): ")) || ua->pint32_val == 0) { - return 1; - } - } - } - sscanf(buf, "JobId=%d Job=%127s", &njobs, JobName); - jcr = get_jcr_by_full_name(JobName); - if (!jcr) { - ua->warning_msg(_("Job \"%s\" not found.\n"), JobName); - return 1; - } + JCR *jcr = select_running_job(ua, "cancel"); + if (!jcr) { + return 1; } - - ret = cancel_job(ua, jcr); + int ret = cancel_job(ua, jcr); free_jcr(jcr); return ret; } @@ -794,6 +688,67 @@ static int python_cmd(UAContext *ua, const char *cmd) return 1; } +static int setbwlimit_cmd(UAContext *ua, const char *cmd) +{ + CLIENT *client=NULL; + char Job[MAX_NAME_LENGTH]; + *Job=0; + int32_t limit=-1; + int i; + + i = find_arg_with_value(ua, "limit"); + if (i >= 0) { + limit = atoi(ua->argv[i]); + } + if (limit < 0) { + if (!get_pint(ua, _("Enter new bandwidth limit kb/s: "))) { + return 1; + } + limit = ua->pint32_val * 1024; /* kb/s */ + } + + if (find_arg(ua, "job") > 0) { + JCR *jcr = select_running_job(ua, "limit"); + if (jcr) { + jcr->max_bandwidth = limit; /* TODO: see for locking (Should be safe)*/ + bstrncpy(Job, jcr->Job, sizeof(Job)); + client = jcr->client; + free_jcr(jcr); + } else { + return 1; + } + + } else { + client = get_client_resource(ua); + } + + if (!client) { + return 1; + } + + /* Connect to File daemon */ + ua->jcr->client = client; + ua->jcr->max_bandwidth = limit; + + /* Try to connect for 15 seconds */ + ua->send_msg(_("Connecting to Client %s at %s:%d\n"), + client->name(), client->address, client->FDport); + if (!connect_to_file_daemon(ua->jcr, 1, 15, 0)) { + ua->error_msg(_("Failed to connect to Client.\n")); + return 1; + } + Dmsg0(120, "Connected to file daemon\n"); + if (!send_bwlimit(ua->jcr, Job)) { + ua->error_msg(_("Failed to set bandwidth limit to Client.\n")); + } + + ua->jcr->file_bsock->signal(BNET_TERMINATE); + ua->jcr->file_bsock->close(); + ua->jcr->file_bsock = NULL; + ua->jcr->client = NULL; + ua->jcr->max_bandwidth = 0; + return 1; +} /* * Set a new address in a Client resource. We do this only diff --git a/bacula/src/dird/ua_select.c b/bacula/src/dird/ua_select.c index 1ff4f27fb4..2e427b0a93 100644 --- a/bacula/src/dird/ua_select.c +++ b/bacula/src/dird/ua_select.c @@ -1052,3 +1052,129 @@ bool get_level_from_name(JCR *jcr, const char *level_name) } return found; } + +/* Get a running job + * "reason" is used in user messages + * can be: cancel, limit, ... + * Returns: NULL on error + * JCR on success (should be free_jcr() after) + */ +JCR *select_running_job(UAContext *ua, const char *reason) +{ + int i; + int njobs = 0; + JCR *jcr = NULL; + char JobName[MAX_NAME_LENGTH]; + char temp[256]; + + for (i=1; iargc; i++) { + if (strcasecmp(ua->argk[i], NT_("jobid")) == 0) { + uint32_t JobId; + JobId = str_to_int64(ua->argv[i]); + if (!JobId) { + break; + } + if (!(jcr=get_jcr_by_id(JobId))) { + ua->error_msg(_("JobId %s is not running. Use Job name to %s inactive jobs.\n"), ua->argv[i], _(reason)); + return NULL; + } + break; + } else if (strcasecmp(ua->argk[i], NT_("job")) == 0) { + if (!ua->argv[i]) { + break; + } + if (!(jcr=get_jcr_by_partial_name(ua->argv[i]))) { + ua->warning_msg(_("Warning Job %s is not running. Continuing anyway ...\n"), ua->argv[i]); + jcr = new_jcr(sizeof(JCR), dird_free_jcr); + bstrncpy(jcr->Job, ua->argv[i], sizeof(jcr->Job)); + } + break; + } else if (strcasecmp(ua->argk[i], NT_("ujobid")) == 0) { + if (!ua->argv[i]) { + break; + } + if (!(jcr=get_jcr_by_full_name(ua->argv[i]))) { + ua->warning_msg(_("Warning Job %s is not running. Continuing anyway ...\n"), ua->argv[i]); + jcr = new_jcr(sizeof(JCR), dird_free_jcr); + bstrncpy(jcr->Job, ua->argv[i], sizeof(jcr->Job)); + } + break; + } + + } + if (jcr) { + if (jcr->job && !acl_access_ok(ua, Job_ACL, jcr->job->name())) { + ua->error_msg(_("Unauthorized command from this console.\n")); + return NULL; + } + } else { + /* + * If we still do not have a jcr, + * throw up a list and ask the user to select one. + */ + char buf[1000]; + int tjobs = 0; /* total # number jobs */ + /* Count Jobs running */ + foreach_jcr(jcr) { + if (jcr->JobId == 0) { /* this is us */ + continue; + } + tjobs++; /* count of all jobs */ + if (!acl_access_ok(ua, Job_ACL, jcr->job->name())) { + continue; /* skip not authorized */ + } + njobs++; /* count of authorized jobs */ + } + endeach_jcr(jcr); + + if (njobs == 0) { /* no authorized */ + if (tjobs == 0) { + ua->send_msg(_("No Jobs running.\n")); + } else { + ua->send_msg(_("None of your jobs are running.\n")); + } + return NULL; + } + + start_prompt(ua, _("Select Job:\n")); + foreach_jcr(jcr) { + char ed1[50]; + if (jcr->JobId == 0) { /* this is us */ + continue; + } + if (!acl_access_ok(ua, Job_ACL, jcr->job->name())) { + continue; /* skip not authorized */ + } + bsnprintf(buf, sizeof(buf), _("JobId=%s Job=%s"), edit_int64(jcr->JobId, ed1), jcr->Job); + add_prompt(ua, buf); + } + endeach_jcr(jcr); + bsnprintf(temp, sizeof(temp), _("Choose Job to %s"), _(reason)); + if (do_prompt(ua, _("Job"), temp, buf, sizeof(buf)) < 0) { + return NULL; + } + if (!strcmp(reason, "cancel")) { + if (ua->api && njobs == 1) { + char nbuf[1000]; + bsnprintf(nbuf, sizeof(nbuf), _("Cancel: %s\n\n%s"), buf, + _("Confirm cancel?")); + if (!get_yesno(ua, nbuf) || ua->pint32_val == 0) { + return NULL; + } + } else { + if (njobs == 1) { + if (!get_yesno(ua, _("Confirm cancel (yes/no): ")) || ua->pint32_val == 0) { + return NULL; + } + } + } + } + sscanf(buf, "JobId=%d Job=%127s", &njobs, JobName); + jcr = get_jcr_by_full_name(JobName); + if (!jcr) { + ua->warning_msg(_("Job \"%s\" not found.\n"), JobName); + return NULL; + } + } + return jcr; +} diff --git a/bacula/src/filed/authenticate.c b/bacula/src/filed/authenticate.c index 2bff4e07e2..6180b1910d 100644 --- a/bacula/src/filed/authenticate.c +++ b/bacula/src/filed/authenticate.c @@ -41,9 +41,10 @@ const int dbglvl = 50; * prior to 10Mar08 no version * 1 10Mar08 * 2 13Mar09 - added the ability to restore from multiple storages - * 3 03Sep10 - added the restore object command for vss plugin + * 3 03Sep10 - added the restore object command for vss plugin 4.0 + * 4 25Nov10 - added bandwidth command 5.1 */ -static char OK_hello[] = "2000 OK Hello 3\n"; +static char OK_hello[] = "2000 OK Hello 4\n"; static char Dir_sorry[] = "2999 Authentication failed.\n"; static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; diff --git a/bacula/src/filed/job.c b/bacula/src/filed/job.c index 6453b81a92..4b29d077cf 100644 --- a/bacula/src/filed/job.c +++ b/bacula/src/filed/job.c @@ -70,6 +70,7 @@ static int backup_cmd(JCR *jcr); static int bootstrap_cmd(JCR *jcr); static int cancel_cmd(JCR *jcr); static int setdebug_cmd(JCR *jcr); +static int setbandwidth_cmd(JCR *jcr); static int estimate_cmd(JCR *jcr); static int hello_cmd(JCR *jcr); static int job_cmd(JCR *jcr); @@ -111,6 +112,7 @@ static struct s_cmds cmds[] = { {"backup", backup_cmd, 0}, {"cancel", cancel_cmd, 0}, {"setdebug=", setdebug_cmd, 0}, + {"setbandwidth=",setbandwidth_cmd, 0}, {"estimate", estimate_cmd, 0}, {"Hello", hello_cmd, 1}, {"fileset", fileset_cmd, 0}, @@ -152,11 +154,13 @@ static char estimatecmd[] = "estimate listing=%d"; static char runbefore[] = "RunBeforeJob %s"; static char runafter[] = "RunAfterJob %s"; static char runscript[] = "Run OnSuccess=%d OnFailure=%d AbortOnError=%d When=%d Command=%s"; +static char setbandwidth[]= "setbandwidth=%lld Job=%127s"; /* Responses sent to Director */ static char errmsg[] = "2999 Invalid command\n"; static char no_auth[] = "2998 No Authorization\n"; static char invalid_cmd[] = "2997 Invalid command for a Director with Monitor directive enabled.\n"; +static char OKBandwidth[] = "2000 OK Bandwidth\n"; static char OKinc[] = "2000 OK include\n"; static char OKest[] = "2000 OK estimate files=%s bytes=%s\n"; static char OKlevel[] = "2000 OK level\n"; @@ -469,6 +473,41 @@ static int cancel_cmd(JCR *jcr) return 1; } +/** + * Set bandwidth limit as requested by the Director + * + */ +static int setbandwidth_cmd(JCR *jcr) +{ + BSOCK *dir = jcr->dir_bsock; + int64_t bw=0; + JCR *cjcr; + char Job[MAX_NAME_LENGTH]; + *Job=0; + + if (sscanf(dir->msg, setbandwidth, &bw, Job) != 2 || bw < 0) { + pm_strcpy(jcr->errmsg, dir->msg); + dir->fsend(_("2991 Bad setbandwidth command: %s\n"), jcr->errmsg); + return 0; + } + + if (*Job) { + if(!(cjcr=get_jcr_by_full_name(Job))) { + dir->fsend(_("2901 Job %s not found.\n"), Job); + } else { + if (cjcr->store_bsock) { + cjcr->store_bsock->set_bwlimit(bw); + } + cjcr->max_bandwidth = bw; + free_jcr(cjcr); + } + + } else { /* No job requested, apply globally */ + me->max_bandwidth = bw; /* Overwrite directive */ + } + + return dir->fsend(OKBandwidth); +} /** * Set debug level as requested by the Director @@ -1696,12 +1735,15 @@ static int storage_cmd(JCR *jcr) sd->set_source_address(me->FDsrc_addr); /* TODO: see if we put limit on restore and backup... */ - if (jcr->director->max_bandwidth) { - sd->set_bwlimit(jcr->director->max_bandwidth); - - } else if (me->max_bandwidth) { - sd->set_bwlimit(me->max_bandwidth); + if (!jcr->max_bandwidth) { + if (jcr->director->max_bandwidth) { + jcr->max_bandwidth = jcr->director->max_bandwidth; + + } else if (me->max_bandwidth) { + jcr->max_bandwidth = me->max_bandwidth; + } } + sd->set_bwlimit(jcr->max_bandwidth); if (!sd->connect(jcr, 10, (int)me->SDConnectTimeout, me->heartbeat_interval, _("Storage daemon"), jcr->stored_addr, NULL, stored_port, 1)) { diff --git a/bacula/src/jcr.h b/bacula/src/jcr.h index 2cffad669a..f331c53425 100644 --- a/bacula/src/jcr.h +++ b/bacula/src/jcr.h @@ -279,6 +279,7 @@ public: char *plugin_options; /* user set options for plugin */ bool cmd_plugin; /* Set when processing a command Plugin = */ POOLMEM *comment; /* Comment for this Job */ + int64_t max_bandwidth; /* Bandwidth limit for this Job */ /* Daemon specific part of JCR */ /* This should be empty in the library */ -- 2.39.5