X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=bacula%2Fsrc%2Fdird%2Fjob.c;h=3ac5feb64533679e8bc5b51c9cee7e8ef2196a9f;hb=86da6147f63b29cb85d51620b55bae7266f1c890;hp=476ee755fef4f1fb72c9625af9f5c76601028915;hpb=8efa2f85ff6c540009d8b435d57ce65f64f5262b;p=bacula%2Fbacula diff --git a/bacula/src/dird/job.c b/bacula/src/dird/job.c index 476ee755fe..3ac5feb645 100644 --- a/bacula/src/dird/job.c +++ b/bacula/src/dird/job.c @@ -1,36 +1,25 @@ /* - Bacula® - The Network Backup Solution - - Copyright (C) 2000-2010 Free Software Foundation Europe e.V. - - The main author of Bacula is Kern Sibbald, with contributions from - many others, a complete list can be found in the file AUTHORS. - This program is Free Software; you can redistribute it and/or - modify it under the terms of version three of the GNU Affero General Public - License as published by the Free Software Foundation and included - in the file LICENSE. - - This program is distributed in the hope that it will be useful, but - WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program; if not, write to the Free Software - Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA - 02110-1301, USA. - - Bacula® is a registered trademark of Kern Sibbald. - The licensor of Bacula is the Free Software Foundation Europe - (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich, - Switzerland, email:ftf@fsfeurope.org. + Bacula(R) - The Network Backup Solution + + Copyright (C) 2000-2017 Kern Sibbald + + The original author of Bacula is Kern Sibbald, with contributions + from many others, a complete list can be found in the file AUTHORS. + + You may use this file and others of this release according to the + license defined in the LICENSE file, which includes the Affero General + Public License, v3.0 ("AGPLv3") and some additional permissions and + terms pursuant to its AGPLv3 Section 7. + + This notice must be preserved when any source code is + conveyed and/or propagated. + + Bacula(R) is a registered trademark of Kern Sibbald. */ /* - * * Bacula Director Job processing routines * * Kern Sibbald, October MM - * */ #include "bacula.h" @@ -97,15 +86,15 @@ JobId_t run_job(JCR *jcr) return jcr->JobId; } return 0; -} +} -bool setup_job(JCR *jcr) +bool setup_job(JCR *jcr) { int errstat; jcr->lock(); Dsm_check(100); - init_msg(jcr, jcr->messages); + init_msg(jcr, jcr->messages, job_code_callback_director); /* Initialize termination condition variable */ if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) { @@ -124,20 +113,25 @@ bool setup_job(JCR *jcr) * Open database */ Dmsg0(100, "Open database\n"); - jcr->db = db_init_database(jcr, jcr->catalog->db_driver, jcr->catalog->db_name, - jcr->catalog->db_user, jcr->catalog->db_password, - jcr->catalog->db_address, jcr->catalog->db_port, - jcr->catalog->db_socket, jcr->catalog->mult_db_connections, - jcr->catalog->disable_batch_insert); + jcr->db = db_init_database(jcr, jcr->catalog->db_driver, jcr->catalog->db_name, + jcr->catalog->db_user, jcr->catalog->db_password, + jcr->catalog->db_address, jcr->catalog->db_port, + jcr->catalog->db_socket, jcr->catalog->db_ssl_key, + jcr->catalog->db_ssl_cert, jcr->catalog->db_ssl_ca, + jcr->catalog->db_ssl_capath, jcr->catalog->db_ssl_cipher, + jcr->catalog->mult_db_connections, + jcr->catalog->disable_batch_insert); if (!jcr->db || !db_open_database(jcr, jcr->db)) { Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"), jcr->catalog->db_name); if (jcr->db) { Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db)); db_close_database(jcr, jcr->db); + jcr->db = NULL; } goto bail_out; } + Dmsg0(150, "DB opened\n"); if (!jcr->fname) { jcr->fname = get_pool_memory(PM_FNAME); @@ -146,6 +140,10 @@ bool setup_job(JCR *jcr) jcr->pool_source = get_pool_memory(PM_MESSAGE); pm_strcpy(jcr->pool_source, _("unknown source")); } + if (!jcr->next_pool_source) { + jcr->next_pool_source = get_pool_memory(PM_MESSAGE); + pm_strcpy(jcr->next_pool_source, _("unknown source")); + } if (jcr->JobReads()) { if (!jcr->rpool_source) { @@ -172,7 +170,7 @@ bool setup_job(JCR *jcr) generate_daemon_event(jcr, "JobStart"); new_plugins(jcr); /* instantiate plugins for this jcr */ - generate_plugin_event(jcr, bEventJobStart); + generate_plugin_event(jcr, bDirEventJobStart); if (job_canceled(jcr)) { goto bail_out; @@ -221,8 +219,131 @@ bool setup_job(JCR *jcr) break; case JT_COPY: case JT_MIGRATE: - if (!do_migration_init(jcr)) { - migration_cleanup(jcr, JS_ErrorTerminated); + if (!do_mac_init(jcr)) { + mac_cleanup(jcr, JS_ErrorTerminated, JS_ErrorTerminated); + goto bail_out; + } + break; + default: + Pmsg1(0, _("Unimplemented job type: %d\n"), jcr->getJobType()); + jcr->setJobStatus(JS_ErrorTerminated); + goto bail_out; + } + + generate_plugin_event(jcr, bDirEventJobInit); + Dsm_check(100); + return true; + +bail_out: + return false; +} + +/* + * Setup a job for a resume command + */ +static bool setup_resume_job(JCR *jcr, JOB_DBR *jr) +{ + int errstat; + jcr->lock(); + Dsm_check(100); + init_msg(jcr, jcr->messages); + + /* Initialize termination condition variable */ + if ((errstat = pthread_cond_init(&jcr->term_wait, NULL)) != 0) { + berrno be; + Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(errstat)); + jcr->unlock(); + goto bail_out; + } + jcr->term_wait_inited = true; + + jcr->setJobStatus(JS_Created); + jcr->unlock(); + + /* + * Open database + */ + Dmsg0(100, "Open database\n"); + jcr->db = db_init_database(jcr, jcr->catalog->db_driver, jcr->catalog->db_name, + jcr->catalog->db_user, jcr->catalog->db_password, + jcr->catalog->db_address, jcr->catalog->db_port, + jcr->catalog->db_socket, jcr->catalog->db_ssl_key, + jcr->catalog->db_ssl_cert, jcr->catalog->db_ssl_ca, + jcr->catalog->db_ssl_capath, jcr->catalog->db_ssl_cipher, + jcr->catalog->mult_db_connections, + jcr->catalog->disable_batch_insert); + if (!jcr->db || !db_open_database(jcr, jcr->db)) { + Jmsg(jcr, M_FATAL, 0, _("Could not open database \"%s\".\n"), + jcr->catalog->db_name); + if (jcr->db) { + Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db)); + db_close_database(jcr, jcr->db); + jcr->db = NULL; + } + goto bail_out; + } + Dmsg0(100, "DB opened\n"); + if (!jcr->fname) { + jcr->fname = get_pool_memory(PM_FNAME); + } + if (!jcr->pool_source) { + jcr->pool_source = get_pool_memory(PM_MESSAGE); + pm_strcpy(jcr->pool_source, _("unknown source")); + } + if (!jcr->next_pool_source) { + jcr->next_pool_source = get_pool_memory(PM_MESSAGE); + pm_strcpy(jcr->next_pool_source, _("unknown source")); + } + + + /* + * Setup Job record. Make sure original job is Incomplete. + */ + memcpy(&jcr->jr, jr, sizeof(JOB_DBR)); + jcr->sched_time = jcr->jr.SchedTime; + jcr->start_time = jcr->jr.StartTime; + jcr->jr.EndTime = 0; /* perhaps rescheduled, clear it */ + jcr->setJobType(jcr->jr.JobType); + jcr->setJobLevel(jcr->jr.JobLevel); + jcr->JobId = jcr->jr.JobId; + if (!get_or_create_client_record(jcr)) { + Dmsg0(100, "Could not create client record.\n"); + goto bail_out; + } + + Dmsg6(100, "Got job record JobId=%d Job=%s Name=%s Type=%c Level=%c Status=%c\n", + jcr->jr.JobId, jcr->jr.Job, jcr->jr.Name, jcr->jr.JobType, jcr->jr.JobLevel, + jcr->jr.JobStatus); + if (jcr->jr.JobStatus != JS_Incomplete) { + /* ***FIXME*** add error message */ + Dmsg1(100, "Job is not an Incomplete: status=%c\n", jcr->jr.JobStatus); + goto bail_out; + } + bstrncpy(jcr->Job, jcr->jr.Job, sizeof(jcr->Job)); + jcr->setJobType(jcr->jr.JobType); + jcr->setJobLevel(jcr->jr.JobLevel); + + generate_daemon_event(jcr, "JobStart"); + new_plugins(jcr); /* instantiate plugins for this jcr */ + generate_plugin_event(jcr, bDirEventJobStart); + + if (job_canceled(jcr)) { + Dmsg0(100, "Oops. Job canceled\n"); + goto bail_out; + } + + /* Re-run the old job */ + jcr->rerunning = true; + + /* + * Now, do pre-run stuff, like setting job level (Inc/diff, ...) + * this allows us to setup a proper job start record for restarting + * in case of later errors. + */ + switch (jcr->getJobType()) { + case JT_BACKUP: + if (!do_backup_init(jcr)) { + backup_cleanup(jcr, JS_ErrorTerminated); goto bail_out; } break; @@ -232,8 +353,7 @@ bool setup_job(JCR *jcr) goto bail_out; } - generate_job_event(jcr, "JobInit"); - generate_plugin_event(jcr, bEventJobInit); + generate_plugin_event(jcr, bDirEventJobInit); Dsm_check(100); return true; @@ -241,6 +361,24 @@ bail_out: return false; } +JobId_t resume_job(JCR *jcr, JOB_DBR *jr) +{ + int stat; + if (setup_resume_job(jcr, jr)) { + Dmsg0(200, "Add jrc to work queue\n"); + /* Queue the job to be run */ + if ((stat = jobq_add(&job_queue, jcr)) != 0) { + berrno be; + Jmsg(jcr, M_FATAL, 0, _("Could not add job queue: ERR=%s\n"), be.bstrerror(stat)); + return 0; + } + return jcr->JobId; + } + return 0; +} + + + void update_job_end(JCR *jcr, int TermCode) { dequeue_messages(jcr); /* display any queued messages */ @@ -305,8 +443,7 @@ static void *job_thread(void *arg) if (!db_update_job_start_record(jcr, jcr->db, &jcr->jr)) { Jmsg(jcr, M_FATAL, 0, "%s", db_strerror(jcr->db)); } - generate_job_event(jcr, "JobRun"); - generate_plugin_event(jcr, bEventJobRun); + generate_plugin_event(jcr, bDirEventJobRun); switch (jcr->getJobType()) { case JT_BACKUP: @@ -339,10 +476,10 @@ static void *job_thread(void *arg) break; case JT_COPY: case JT_MIGRATE: - if (!job_canceled(jcr) && do_migration(jcr)) { + if (!job_canceled(jcr) && do_mac(jcr)) { do_autoprune(jcr); } else { - migration_cleanup(jcr, JS_ErrorTerminated); + mac_cleanup(jcr, JS_ErrorTerminated, JS_ErrorTerminated); } break; default: @@ -358,7 +495,7 @@ static void *job_thread(void *arg) } generate_daemon_event(jcr, "JobEnd"); - generate_plugin_event(jcr, bEventJobEnd); + generate_plugin_event(jcr, bDirEventJobEnd); Dmsg1(50, "======== End Job stat=%c ==========\n", jcr->JobStatus); Dsm_check(100); return NULL; @@ -368,7 +505,7 @@ void sd_msg_thread_send_signal(JCR *jcr, int sig) { jcr->lock(); if ( !jcr->sd_msg_thread_done - && jcr->SD_msg_chan + && jcr->SD_msg_chan_started && !pthread_equal(jcr->SD_msg_chan, pthread_self())) { Dmsg1(800, "Send kill to SD msg chan jid=%d\n", jcr->JobId); @@ -377,6 +514,155 @@ void sd_msg_thread_send_signal(JCR *jcr, int sig) jcr->unlock(); } +static bool cancel_file_daemon_job(UAContext *ua, const char *cmd, JCR *jcr) +{ + CLIENT *old_client; + + if (!jcr->client) { + Dmsg0(100, "No client to cancel\n"); + return false; + } + old_client = ua->jcr->client; + ua->jcr->client = jcr->client; + if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) { + ua->error_msg(_("Failed to connect to File daemon.\n")); + ua->jcr->client = old_client; + return false; + } + Dmsg3(10, "Connected to file daemon %s for cancel ua.jcr=%p jcr=%p\n", + ua->jcr->client->name(), ua->jcr, jcr); + BSOCK *fd = ua->jcr->file_bsock; + fd->fsend("%s Job=%s\n", cmd, jcr->Job); + while (fd->recv() >= 0) { + ua->send_msg("%s", fd->msg); + } + fd->signal(BNET_TERMINATE); + free_bsock(ua->jcr->file_bsock); + ua->jcr->client = old_client; + return true; +} + +static bool cancel_sd_job(UAContext *ua, const char *cmd, JCR *jcr) +{ + if (jcr->store_bsock) { + if (jcr->rstorage) { + copy_wstorage(ua->jcr, jcr->rstorage, _("Job resource")); + } else { + copy_wstorage(ua->jcr, jcr->wstorage, _("Job resource")); + } + } else { + USTORE store; + if (jcr->rstorage) { + store.store = jcr->rstore; + } else { + store.store = jcr->wstore; + } + set_wstorage(ua->jcr, &store); + } + + if (!ua->jcr->wstore) { + ua->error_msg(_("Failed to select Storage daemon.\n")); + return false; + } + + if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) { + ua->error_msg(_("Failed to connect to Storage daemon.\n")); + return false; + } + + Dmsg3(10, "Connected to storage daemon %s for cancel ua.jcr=%p jcr=%p\n", + ua->jcr->wstore->name(), ua->jcr, jcr); + + BSOCK *sd = ua->jcr->store_bsock; + sd->fsend("%s Job=%s\n", cmd, jcr->Job); + while (sd->recv() >= 0) { + ua->send_msg("%s", sd->msg); + } + sd->signal(BNET_TERMINATE); + free_bsock(ua->jcr->store_bsock); + return true; +} + +/* The FD is not connected, so we try to complete JCR fields and send + * the cancel command. + */ +int cancel_inactive_job(UAContext *ua) +{ + CLIENT_DBR cr; + JOB_DBR jr; + int i; + USTORE store; + CLIENT *client; + JCR *jcr = new_jcr(sizeof(JCR), dird_free_jcr); + + memset(&jr, 0, sizeof(jr)); + memset(&cr, 0, sizeof(cr)); + + if ((i = find_arg_with_value(ua, "jobid")) > 0) { + jr.JobId = str_to_int64(ua->argv[i]); + + } else if ((i = find_arg_with_value(ua, "ujobid")) > 0) { + bstrncpy(jr.Job, ua->argv[i], sizeof(jr.Job)); + + } else { + ua->error_msg(_("jobid/ujobid argument not found.\n")); + goto bail_out; + } + + if (!open_client_db(ua)) { + goto bail_out; + } + + if (!db_get_job_record(ua->jcr, ua->db, &jr)) { + ua->error_msg(_("Job %ld/%s not found in database.\n"), jr.JobId, jr.Job); + goto bail_out; + } + + if (!acl_access_ok(ua, Job_ACL, jr.Name)) { + ua->error_msg(_("Job %s is not accessible from this console\n"), jr.Name); + goto bail_out; + } + + cr.ClientId = jr.ClientId; + if (!cr.ClientId || !db_get_client_record(ua->jcr, ua->db, &cr)) { + ua->error_msg(_("Client %ld not found in database.\n"), jr.ClientId); + goto bail_out; + } + + if (acl_access_client_ok(ua, cr.Name, jr.JobType)) { + client = (CLIENT *)GetResWithName(R_CLIENT, cr.Name); + if (client) { + jcr->client = client; + } else { + Jmsg1(jcr, M_FATAL, 0, _("Client resource \"%s\" does not exist.\n"), cr.Name); + goto bail_out; + } + } else { + goto bail_out; + } + + jcr->JobId = jr.JobId; + bstrncpy(jcr->Job, jr.Job, sizeof(jcr->Job)); + + cancel_file_daemon_job(ua, "cancel", jcr); + + /* At this time, we can't really guess the storage name from + * the job record + */ + store.store = get_storage_resource(ua, false/*no default*/, true/*unique*/); + if (!store.store) { + goto bail_out; + } + + set_wstorage(jcr, &store); + cancel_sd_job(ua, "cancel", jcr); + +bail_out: + jcr->JobId = 0; + free_jcr(jcr); + return 1; +} + /* * Cancel a job -- typically called by the UA (Console program), but may also * be called by the job watchdog. @@ -384,13 +670,28 @@ void sd_msg_thread_send_signal(JCR *jcr, int sig) * Returns: true if cancel appears to be successful * false on failure. Message sent to ua->jcr. */ -bool cancel_job(UAContext *ua, JCR *jcr) +bool +cancel_job(UAContext *ua, JCR *jcr, int wait, bool cancel) { - BSOCK *sd, *fd; char ed1[50]; int32_t old_status = jcr->JobStatus; + int status; + const char *reason, *cmd; + + Dmsg3(10, "cancel_job jcr=%p jobid=%d use_count\n", jcr, jcr->JobId, jcr->use_count()); + + if (cancel) { + status = JS_Canceled; + reason = _("canceled"); + cmd = NT_("cancel"); + } else { + status = JS_Incomplete; + reason = _("stopped"); + cmd = NT_("stop"); + jcr->RescheduleIncompleteJobs = false; /* do not restart */ + } - jcr->setJobStatus(JS_Canceled); + jcr->setJobStatus(status); switch (old_status) { case JS_Created: @@ -400,68 +701,73 @@ bool cancel_job(UAContext *ua, JCR *jcr) case JS_WaitPriority: case JS_WaitMaxJobs: case JS_WaitStartTime: - ua->info_msg(_("JobId %s, Job %s marked to be canceled.\n"), - edit_uint64(jcr->JobId, ed1), jcr->Job); + case JS_WaitDevice: + ua->info_msg(_("JobId %s, Job %s marked to be %s.\n"), + edit_uint64(jcr->JobId, ed1), jcr->Job, + reason); jobq_remove(&job_queue, jcr); /* attempt to remove it from queue */ break; default: + /* Cancel File daemon */ if (jcr->file_bsock) { - ua->jcr->client = jcr->client; - if (!connect_to_file_daemon(ua->jcr, 10, FDConnectTimeout, 1)) { - ua->error_msg(_("Failed to connect to File daemon.\n")); - return 0; - } - Dmsg0(200, "Connected to file daemon\n"); - fd = ua->jcr->file_bsock; - fd->fsend("cancel Job=%s\n", jcr->Job); - while (fd->recv() >= 0) { - ua->send_msg("%s", fd->msg); - } - fd->signal(BNET_TERMINATE); - fd->close(); - ua->jcr->file_bsock = NULL; + btimer_t *tid; + /* do not return now, we want to try to cancel the sd */ + tid = start_bsock_timer(jcr->file_bsock, 120); + cancel_file_daemon_job(ua, cmd, jcr); + stop_bsock_timer(tid); + } + + /* We test file_bsock because the previous operation can take + * several minutes + */ + if (jcr->file_bsock && cancel) { jcr->file_bsock->set_terminated(); jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } /* Cancel Storage daemon */ if (jcr->store_bsock) { - if (!ua->jcr->wstorage) { - if (jcr->rstorage) { - copy_wstorage(ua->jcr, jcr->rstorage, _("Job resource")); - } else { - copy_wstorage(ua->jcr, jcr->wstorage, _("Job resource")); - } - } else { - USTORE store; - if (jcr->rstorage) { - store.store = jcr->rstore; - } else { - store.store = jcr->wstore; - } - set_wstorage(ua->jcr, &store); - } + btimer_t *tid; + /* do not return now, we want to try to cancel the sd socket */ + tid = start_bsock_timer(jcr->store_bsock, 120); + cancel_sd_job(ua, cmd, jcr); + stop_bsock_timer(tid); + } - if (!connect_to_storage_daemon(ua->jcr, 10, SDConnectTimeout, 1)) { - ua->error_msg(_("Failed to connect to Storage daemon.\n")); - return false; - } - Dmsg0(200, "Connected to storage daemon\n"); - sd = ua->jcr->store_bsock; - sd->fsend("cancel Job=%s\n", jcr->Job); - while (sd->recv() >= 0) { - ua->send_msg("%s", sd->msg); - } - sd->signal(BNET_TERMINATE); - sd->close(); - ua->jcr->store_bsock = NULL; + /* We test file_bsock because the previous operation can take + * several minutes + */ + if (jcr->store_bsock && cancel) { jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); sd_msg_thread_send_signal(jcr, TIMEOUT_SIGNAL); jcr->my_thread_send_signal(TIMEOUT_SIGNAL); } + + /* Cancel Copy/Migration Storage daemon */ + if (jcr->wjcr) { + /* The wjcr is valid until we call free_jcr(jcr) */ + JCR *wjcr = jcr->wjcr; + + if (wjcr->store_bsock) { + btimer_t *tid; + /* do not return now, we want to try to cancel the sd socket */ + tid = start_bsock_timer(wjcr->store_bsock, 120); + cancel_sd_job(ua, cmd, wjcr); + stop_bsock_timer(tid); + } + /* We test store_bsock because the previous operation can take + * several minutes + */ + if (wjcr->store_bsock && cancel) { + wjcr->store_bsock->set_timed_out(); + wjcr->store_bsock->set_terminated(); + sd_msg_thread_send_signal(wjcr, TIMEOUT_SIGNAL); + wjcr->my_thread_send_signal(TIMEOUT_SIGNAL); + } + } break; } @@ -470,7 +776,7 @@ bool cancel_job(UAContext *ua, JCR *jcr) void cancel_storage_daemon_job(JCR *jcr) { - if (jcr->sd_canceled) { + if (jcr->sd_canceled) { return; /* cancel only once */ } @@ -482,9 +788,9 @@ void cancel_storage_daemon_job(JCR *jcr) if (jcr->store_bsock) { if (!ua->jcr->wstorage) { if (jcr->rstorage) { - copy_wstorage(ua->jcr, jcr->rstorage, _("Job resource")); + copy_wstorage(ua->jcr, jcr->rstorage, _("Job resource")); } else { - copy_wstorage(ua->jcr, jcr->wstorage, _("Job resource")); + copy_wstorage(ua->jcr, jcr->wstorage, _("Job resource")); } } else { USTORE store; @@ -505,8 +811,7 @@ void cancel_storage_daemon_job(JCR *jcr) while (sd->recv() >= 0) { } sd->signal(BNET_TERMINATE); - sd->close(); - ua->jcr->store_bsock = NULL; + free_bsock(ua->jcr->store_bsock); jcr->sd_canceled = true; jcr->store_bsock->set_timed_out(); jcr->store_bsock->set_terminated(); @@ -525,14 +830,33 @@ static void job_monitor_destructor(watchdog_t *self) free_jcr(control_jcr); } -static void job_monitor_watchdog(watchdog_t *self) +extern "C" void *cancel_thread(void *arg) { - JCR *control_jcr, *jcr; + JCR *jcr = (JCR *)arg; + UAContext *ua; + JCR *control_jcr; - control_jcr = (JCR *)self->data; + pthread_detach(pthread_self()); + ua = new_ua_context(jcr); + control_jcr = new_control_jcr("*CancelThread*", JT_SYSTEM); + ua->jcr = control_jcr; + + Dmsg3(400, "Cancelling JCR %p JobId=%d (%s)\n", jcr, jcr->JobId, jcr->Job); + cancel_job(ua, jcr, 120); + Dmsg2(400, "Have cancelled JCR %p JobId=%d\n", jcr, jcr->JobId); + + free_ua_context(ua); + free_jcr(control_jcr); + free_jcr(jcr); + return NULL; +} + +static void job_monitor_watchdog(watchdog_t *wd) +{ + JCR *jcr; Dsm_check(100); - Dmsg1(800, "job_monitor_watchdog %p called\n", self); + Dmsg1(800, "job_monitor_watchdog %p called\n", wd); foreach_jcr(jcr) { bool cancel = false; @@ -552,7 +876,7 @@ static void job_monitor_watchdog(watchdog_t *self) jcr->setJobStatus(JS_Canceled); Qmsg(jcr, M_FATAL, 0, _("Max run time exceeded. Job canceled.\n")); cancel = true; - /* check MaxRunSchedTime */ + /* check MaxRunSchedTime */ } else if (job_check_maxrunschedtime(jcr)) { jcr->setJobStatus(JS_Canceled); Qmsg(jcr, M_FATAL, 0, _("Max run sched time exceeded. Job canceled.\n")); @@ -560,14 +884,15 @@ static void job_monitor_watchdog(watchdog_t *self) } if (cancel) { - Dmsg3(800, "Cancelling JCR %p jobid %d (%s)\n", jcr, jcr->JobId, jcr->Job); - UAContext *ua = new_ua_context(jcr); - ua->jcr = control_jcr; - cancel_job(ua, jcr); - free_ua_context(ua); - Dmsg2(800, "Have cancelled JCR %p Job=%d\n", jcr, jcr->JobId); + pthread_t thid; + int status; + jcr->inc_use_count(); + if ((status=pthread_create(&thid, NULL, cancel_thread, (void *)jcr)) != 0) { + berrno be; + Jmsg1(jcr, M_WARNING, 0, _("Cannot create cancel thread: ERR=%s\n"), be.bstrerror(status)); + free_jcr(jcr); + } } - } /* Keep reference counts correct */ endeach_jcr(jcr); @@ -591,7 +916,7 @@ static bool job_check_maxwaittime(JCR *jcr) current = watchdog_time - jcr->wait_time; } - Dmsg2(200, "check maxwaittime %u >= %u\n", + Dmsg2(200, "check maxwaittime %u >= %u\n", current + jcr->wait_time_sum, job->MaxWaitTime); if (job->MaxWaitTime != 0 && (current + jcr->wait_time_sum) >= job->MaxWaitTime) { @@ -611,7 +936,7 @@ static bool job_check_maxruntime(JCR *jcr) JOB *job = jcr->job; utime_t run_time; - if (job_canceled(jcr) || jcr->JobStatus == JS_Created) { + if (job_canceled(jcr) || !jcr->job_started) { return false; } if (jcr->job->MaxRunTime == 0 && job->FullMaxRunTime == 0 && @@ -620,7 +945,7 @@ static bool job_check_maxruntime(JCR *jcr) } run_time = watchdog_time - jcr->start_time; Dmsg7(200, "check_maxruntime %llu-%u=%llu >= %llu|%llu|%llu|%llu\n", - watchdog_time, jcr->start_time, run_time, job->MaxRunTime, job->FullMaxRunTime, + watchdog_time, jcr->start_time, run_time, job->MaxRunTime, job->FullMaxRunTime, job->IncMaxRunTime, job->DiffMaxRunTime); if (jcr->getJobLevel() == L_FULL && job->FullMaxRunTime != 0 && @@ -639,7 +964,7 @@ static bool job_check_maxruntime(JCR *jcr) Dmsg0(200, "check_maxwaittime: Maxcancel\n"); cancel = true; } - + return cancel; } @@ -652,7 +977,7 @@ static bool job_check_maxrunschedtime(JCR *jcr) if (jcr->MaxRunSchedTime == 0 || job_canceled(jcr)) { return false; } - if ((watchdog_time - jcr->sched_time) < jcr->MaxRunSchedTime) { + if ((watchdog_time - jcr->initial_sched_time) < jcr->MaxRunSchedTime) { Dmsg3(200, "Job %p (%s) with MaxRunSchedTime %d not expired\n", jcr, jcr->Job, jcr->MaxRunSchedTime); return false; @@ -677,7 +1002,7 @@ DBId_t get_or_create_pool_record(JCR *jcr, char *pool_name) while (!db_get_pool_record(jcr, jcr->db, &pr)) { /* get by Name */ /* Try to create the pool */ if (create_pool(jcr, jcr->db, jcr->pool, POOL_OP_CREATE) < 0) { - Jmsg(jcr, M_FATAL, 0, _("Pool \"%s\" not in database. ERR=%s"), pr.Name, + Jmsg(jcr, M_FATAL, 0, _("Cannot create pool \"%s\" in database. ERR=%s"), pr.Name, db_strerror(jcr->db)); return 0; } else { @@ -697,7 +1022,9 @@ bool allow_duplicate_job(JCR *jcr) JOB *job = jcr->job; JCR *djcr; /* possible duplicate job */ - if (jcr->no_check_duplicates || job->AllowDuplicateJobs) { + /* Is AllowDuplicateJobs is set or is duplicate checking + * disabled for this job? */ + if (job->AllowDuplicateJobs || jcr->IgnoreDuplicateJobChecking) { return true; } Dmsg0(800, "Enter allow_duplicate_job\n"); @@ -707,19 +1034,26 @@ bool allow_duplicate_job(JCR *jcr) */ foreach_jcr(djcr) { - if (jcr == djcr || djcr->JobId == 0) { + if (jcr == djcr || djcr->is_internal_job() || !djcr->job) { continue; /* do not cancel this job or consoles */ } - if (strcmp(job->name(), djcr->job->name()) == 0) { + /* Does Job has the IgnoreDuplicateJobChecking flag set, + * if so do not check it against other jobs */ + if (djcr->IgnoreDuplicateJobChecking) { + continue; + } + if ((strcmp(job->name(), djcr->job->name()) == 0) && + djcr->getJobType() == jcr->getJobType()) /* A duplicate is about the same name and the same type */ + { bool cancel_dup = false; - bool cancel_me = false; + bool cancel_me = false; if (job->DuplicateJobProximity > 0) { utime_t now = (utime_t)time(NULL); if ((now - djcr->start_time) > job->DuplicateJobProximity) { continue; /* not really a duplicate */ } } - if (job->CancelLowerLevelDuplicates && + if (job->CancelLowerLevelDuplicates && djcr->getJobType() == 'B' && jcr->getJobType() == 'B') { switch (jcr->getJobLevel()) { case L_FULL: @@ -743,15 +1077,16 @@ bool allow_duplicate_job(JCR *jcr) } } /* - * cancel_dup will be done below + * cancel_dup will be done below */ if (cancel_me) { /* Zap current job */ + jcr->setJobStatus(JS_Canceled); Jmsg(jcr, M_FATAL, 0, _("JobId %d already running. Duplicate job not allowed.\n"), djcr->JobId); break; /* get out of foreach_jcr */ } - } + } /* Cancel one of the two jobs (me or dup) */ /* If CancelQueuedDuplicates is set do so only if job is queued */ if (job->CancelQueuedDuplicates) { @@ -763,6 +1098,7 @@ bool allow_duplicate_job(JCR *jcr) case JS_WaitPriority: case JS_WaitMaxJobs: case JS_WaitStartTime: + case JS_WaitDevice: cancel_dup = true; /* cancel queued duplicate */ break; default: @@ -773,13 +1109,15 @@ bool allow_duplicate_job(JCR *jcr) /* Zap the duplicated job djcr */ UAContext *ua = new_ua_context(jcr); Jmsg(jcr, M_INFO, 0, _("Cancelling duplicate JobId=%d.\n"), djcr->JobId); - cancel_job(ua, djcr); + cancel_job(ua, djcr, 60); bmicrosleep(0, 500000); - cancel_job(ua, djcr); + djcr->setJobStatus(JS_Canceled); + cancel_job(ua, djcr, 60); free_ua_context(ua); Dmsg2(800, "Cancel dup %p JobId=%d\n", djcr, djcr->JobId); } else { - /* Zap current job */ + /* Zap current job */ + jcr->setJobStatus(JS_Canceled); Jmsg(jcr, M_FATAL, 0, _("JobId %d already running. Duplicate job not allowed.\n"), djcr->JobId); Dmsg2(800, "Cancel me %p JobId=%d\n", jcr, jcr->JobId); @@ -791,15 +1129,71 @@ bool allow_duplicate_job(JCR *jcr) } endeach_jcr(djcr); - return true; + return true; +} + +/* + * Apply pool overrides to get the storage properly setup. + */ +bool apply_wstorage_overrides(JCR *jcr, POOL *opool) +{ + const char *source; + + Dmsg1(100, "Original pool=%s\n", opool->name()); + if (jcr->cmdline_next_pool_override) { + /* Can be Command line or User input */ + source = NPRT(jcr->next_pool_source); + } else if (jcr->run_next_pool_override) { + pm_strcpy(jcr->next_pool_source, _("Run NextPool override")); + pm_strcpy(jcr->pool_source, _("Run NextPool override")); + source = _("Run NextPool override"); + } else if (jcr->job->next_pool) { + /* Use Job Next Pool */ + jcr->next_pool = jcr->job->next_pool; + pm_strcpy(jcr->next_pool_source, _("Job's NextPool resource")); + pm_strcpy(jcr->pool_source, _("Job's NextPool resource")); + source = _("Job's NextPool resource"); + } else { + /* Default to original pool->NextPool */ + jcr->next_pool = opool->NextPool; + Dmsg1(100, "next_pool=%p\n", jcr->next_pool); + if (jcr->next_pool) { + Dmsg1(100, "Original pool next Pool = %s\n", NPRT(jcr->next_pool->name())); + } + pm_strcpy(jcr->next_pool_source, _("Job Pool's NextPool resource")); + pm_strcpy(jcr->pool_source, _("Job Pool's NextPool resource")); + source = _("Pool's NextPool resource"); + } + + /* + * If the original backup pool has a NextPool, make sure a + * record exists in the database. + */ + if (jcr->next_pool) { + jcr->jr.PoolId = get_or_create_pool_record(jcr, jcr->next_pool->name()); + if (jcr->jr.PoolId == 0) { + return false; + } + } + + if (!set_mac_wstorage(NULL, jcr, jcr->pool, jcr->next_pool, source)) { + return false; + } + + /* Set write pool and source. Not read pool is in rpool. */ + jcr->pool = jcr->next_pool; + pm_strcpy(jcr->pool_source, source); + + return true; } + void apply_pool_overrides(JCR *jcr) { bool pool_override = false; if (jcr->run_pool_override) { - pm_strcpy(jcr->pool_source, _("Run pool override")); + pm_strcpy(jcr->pool_source, _("Run Pool override")); } /* * Apply any level related Pool selections @@ -854,6 +1248,10 @@ bool get_or_create_client_record(JCR *jcr) { CLIENT_DBR cr; + if (!jcr->client) { + Jmsg(jcr, M_FATAL, 0, _("No Client specified.\n")); + return false; + } memset(&cr, 0, sizeof(cr)); bstrncpy(cr.Name, jcr->client->hdr.name, sizeof(cr.Name)); cr.AutoPrune = jcr->client->AutoPrune; @@ -880,12 +1278,13 @@ bool get_or_create_client_record(JCR *jcr) return true; } +/* + * Get or Create FileSet record + */ bool get_or_create_fileset_record(JCR *jcr) { FILESET_DBR fsr; - /* - * Get or Create FileSet record - */ + memset(&fsr, 0, sizeof(FILESET_DBR)); bstrncpy(fsr.FileSet, jcr->fileset->hdr.name, sizeof(fsr.FileSet)); if (jcr->fileset->have_MD5) { @@ -944,7 +1343,7 @@ void update_job_end_record(JCR *jcr) jcr->jr.ReadBytes = jcr->ReadBytes; jcr->jr.VolSessionId = jcr->VolSessionId; jcr->jr.VolSessionTime = jcr->VolSessionTime; - jcr->jr.JobErrors = jcr->JobErrors; + jcr->jr.JobErrors = jcr->JobErrors + jcr->SDErrors; jcr->jr.HasBase = jcr->HasBase; if (!db_update_job_end_record(jcr, jcr->db, &jcr->jr)) { Jmsg(jcr, M_WARNING, 0, _("Error updating job record. %s"), @@ -975,6 +1374,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name) char name[MAX_NAME_LENGTH]; char *p; int len; + int local_seq; /* Guarantee unique start time -- maximum one per second, and * thus unique Job Name @@ -989,6 +1389,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name) } } last_start_time = now; + local_seq = seq; V(mutex); /* allow creation of jobs */ jcr->start_time = now; /* Form Unique JobName */ @@ -998,7 +1399,7 @@ void create_unique_job_name(JCR *jcr, const char *base_name) len = strlen(dt) + 5; /* dt + .%02d EOS */ bstrncpy(name, base_name, sizeof(name)); name[sizeof(name)-len] = 0; /* truncate if too long */ - bsnprintf(jcr->Job, sizeof(jcr->Job), "%s.%s_%02d", name, dt, seq); /* add date & time */ + bsnprintf(jcr->Job, sizeof(jcr->Job), "%s.%s_%02d", name, dt, local_seq); /* add date & time */ /* Convert spaces into underscores */ for (p=jcr->Job; *p; p++) { if (*p == ' ') { @@ -1011,26 +1412,27 @@ void create_unique_job_name(JCR *jcr, const char *base_name) /* Called directly from job rescheduling */ void dird_free_jcr_pointers(JCR *jcr) { + /* Close but do not free bsock packets */ if (jcr->file_bsock) { Dmsg0(200, "Close File bsock\n"); - bnet_close(jcr->file_bsock); - jcr->file_bsock = NULL; + jcr->file_bsock->close(); } if (jcr->store_bsock) { Dmsg0(200, "Close Store bsock\n"); - bnet_close(jcr->store_bsock); - jcr->store_bsock = NULL; + jcr->store_bsock->close(); } bfree_and_null(jcr->sd_auth_key); bfree_and_null(jcr->where); bfree_and_null(jcr->RestoreBootstrap); + jcr->cached_attribute = false; bfree_and_null(jcr->ar); free_and_null_pool_memory(jcr->JobIds); free_and_null_pool_memory(jcr->client_uname); free_and_null_pool_memory(jcr->attr); free_and_null_pool_memory(jcr->fname); + free_and_null_pool_memory(jcr->media_type); } /* @@ -1043,6 +1445,13 @@ void dird_free_jcr(JCR *jcr) Dmsg0(200, "Start dird free_jcr\n"); dird_free_jcr_pointers(jcr); + if (jcr->wjcr) { + free_jcr(jcr->wjcr); + jcr->wjcr = NULL; + } + /* Free bsock packets */ + free_bsock(jcr->file_bsock); + free_bsock(jcr->store_bsock); if (jcr->term_wait_inited) { pthread_cond_destroy(&jcr->term_wait); jcr->term_wait_inited = false; @@ -1060,10 +1469,13 @@ void dird_free_jcr(JCR *jcr) free_and_null_pool_memory(jcr->stime); free_and_null_pool_memory(jcr->fname); free_and_null_pool_memory(jcr->pool_source); + free_and_null_pool_memory(jcr->next_pool_source); free_and_null_pool_memory(jcr->catalog_source); free_and_null_pool_memory(jcr->rpool_source); free_and_null_pool_memory(jcr->wstore_source); free_and_null_pool_memory(jcr->rstore_source); + free_and_null_pool_memory(jcr->next_vol_list); + free_and_null_pool_memory(jcr->component_fname); /* Delete lists setup to hold storage pointers */ free_rwstorage(jcr); @@ -1073,17 +1485,24 @@ void dird_free_jcr(JCR *jcr) if (jcr->JobId != 0) write_state_file(director->working_directory, "bacula-dir", get_first_port_host_order(director->DIRaddrs)); + if (jcr->plugin_config) { + free_plugin_config_items(jcr->plugin_config); + delete jcr->plugin_config; + jcr->plugin_config = NULL; + } free_plugins(jcr); /* release instantiated plugins */ + garbage_collect_memory_pool(); + Dmsg0(200, "End dird free_jcr\n"); } -/* +/* * The Job storage definition must be either in the Job record - * or in the Pool record. The Pool record overrides the Job + * or in the Pool record. The Pool record overrides the Job * record. */ -void get_job_storage(USTORE *store, JOB *job, RUN *run) +void get_job_storage(USTORE *store, JOB *job, RUN *run) { if (run && run->pool && run->pool->storage) { store->store = (STORE *)run->pool->storage->first(); @@ -1125,17 +1544,20 @@ void set_jcr_defaults(JCR *jcr, JOB *job) jcr->setJobLevel(job->JobLevel); break; } - + if (!jcr->next_vol_list) { + jcr->next_vol_list = get_pool_memory(PM_FNAME); + } if (!jcr->fname) { jcr->fname = get_pool_memory(PM_FNAME); } if (!jcr->pool_source) { jcr->pool_source = get_pool_memory(PM_MESSAGE); - pm_strcpy(jcr->pool_source, _("unknown source")); + } + if (!jcr->next_pool_source) { + jcr->next_pool_source = get_pool_memory(PM_MESSAGE); } if (!jcr->catalog_source) { jcr->catalog_source = get_pool_memory(PM_MESSAGE); - pm_strcpy(jcr->catalog_source, _("unknown source")); } jcr->JobPriority = job->Priority; @@ -1146,12 +1568,22 @@ void set_jcr_defaults(JCR *jcr, JOB *job) copy_rwstorage(jcr, job->pool->storage, _("Pool resource")); } jcr->client = job->client; + ASSERT2(jcr->client, "jcr->client==NULL!!!"); if (!jcr->client_name) { jcr->client_name = get_pool_memory(PM_NAME); } - pm_strcpy(jcr->client_name, jcr->client->hdr.name); - pm_strcpy(jcr->pool_source, _("Job resource")); + pm_strcpy(jcr->client_name, jcr->client->name()); jcr->pool = job->pool; + pm_strcpy(jcr->pool_source, _("Job resource")); + if (job->next_pool) { + /* Use Job's Next Pool */ + jcr->next_pool = job->next_pool; + pm_strcpy(jcr->next_pool_source, _("Job's NextPool resource")); + } else { + /* Default to original pool->NextPool */ + jcr->next_pool = job->pool->NextPool; + pm_strcpy(jcr->next_pool_source, _("Job Pool's NextPool resource")); + } jcr->full_pool = job->full_pool; jcr->inc_pool = job->inc_pool; jcr->diff_pool = job->diff_pool; @@ -1163,11 +1595,11 @@ void set_jcr_defaults(JCR *jcr, JOB *job) pm_strcpy(jcr->catalog_source, _("Client resource")); } jcr->fileset = job->fileset; + jcr->accurate = job->accurate; jcr->messages = job->messages; jcr->spool_data = job->spool_data; jcr->spool_size = job->spool_size; jcr->write_part_after_job = job->write_part_after_job; - jcr->accurate = job->accurate; jcr->MaxRunSchedTime = job->MaxRunSchedTime; if (jcr->RestoreBootstrap) { free(jcr->RestoreBootstrap); @@ -1199,7 +1631,7 @@ void set_jcr_defaults(JCR *jcr, JOB *job) } } -/* +/* * Copy the storage definitions from an alist to the JCR */ void copy_rwstorage(JCR *jcr, alist *storage, const char *where) @@ -1230,7 +1662,7 @@ void free_rwstorage(JCR *jcr) free_wstorage(jcr); } -/* +/* * Copy the storage definitions from an alist to the JCR */ void copy_rstorage(JCR *jcr, alist *storage, const char *where) @@ -1292,7 +1724,7 @@ void free_rstorage(JCR *jcr) jcr->rstore = NULL; } -/* +/* * Copy the storage definitions from an alist to the JCR */ void copy_wstorage(JCR *jcr, alist *storage, const char *where) @@ -1357,14 +1789,6 @@ void free_wstorage(JCR *jcr) jcr->wstore = NULL; } -char *job_code_callback_clones(JCR *jcr, const char* param) -{ - if (param[0] == 'p') { - return jcr->pool->name(); - } - return NULL; -} - void create_clones(JCR *jcr) { /* @@ -1378,7 +1802,7 @@ void create_clones(JCR *jcr) UAContext *ua = new_ua_context(jcr); ua->batch = true; foreach_alist(runcmd, job->run_cmds) { - cmd = edit_job_codes(jcr, cmd, runcmd, "", job_code_callback_clones); + cmd = edit_job_codes(jcr, cmd, runcmd, "", job_code_callback_director); Mmsg(ua->cmd, "run %s cloned=yes", cmd); Dmsg1(900, "=============== Clone cmd=%s\n", ua->cmd); parse_ua_args(ua); /* parse command */ @@ -1396,45 +1820,55 @@ void create_clones(JCR *jcr) } /* - * Given: a JobId in jcr->previous_jr.JobId, + * Given: a JobId and FileIndex * this subroutine writes a bsr file to restore that job. * Returns: -1 on error * number of files if OK */ -int create_restore_bootstrap_file(JCR *jcr) +int create_restore_bootstrap_file(JCR *jcr, JobId_t jobid, int findex1, int findex2) { RESTORE_CTX rx; UAContext *ua; int files; memset(&rx, 0, sizeof(rx)); - rx.bsr = new_bsr(); - rx.JobIds = (char *)""; - rx.bsr->JobId = jcr->previous_jr.JobId; + rx.JobIds = (char *)""; + + rx.bsr_list = create_bsr_list(jobid, findex1, findex2); + ua = new_ua_context(jcr); - if (!complete_bsr(ua, rx.bsr)) { + if (!complete_bsr(ua, rx.bsr_list)) { files = -1; goto bail_out; } - rx.bsr->fi = new_findex(); - rx.bsr->fi->findex = 1; - rx.bsr->fi->findex2 = jcr->previous_jr.JobFiles; + jcr->ExpectedFiles = write_bsr_file(ua, rx); if (jcr->ExpectedFiles == 0) { files = 0; goto bail_out; } free_ua_context(ua); - free_bsr(rx.bsr); + free_bsr(rx.bsr_list); jcr->needs_sd = true; return jcr->ExpectedFiles; bail_out: free_ua_context(ua); - free_bsr(rx.bsr); + free_bsr(rx.bsr_list); return files; } +/* + * Given: a JobId in jcr->previous_jr.JobId, + * this subroutine writes a bsr file to restore that job. + * Returns: -1 on error + * number of files if OK + */ +int create_restore_bootstrap_file(JCR *jcr) +{ + return create_restore_bootstrap_file(jcr, jcr->previous_jr.JobId, 1, jcr->previous_jr.JobFiles); +} + /* TODO: redirect command ouput to job log */ bool run_console_command(JCR *jcr, const char *cmd) { @@ -1442,12 +1876,17 @@ bool run_console_command(JCR *jcr, const char *cmd) bool ok; JCR *ljcr = new_control_jcr("-RunScript-", JT_CONSOLE); ua = new_ua_context(ljcr); - /* run from runscript and check if commands are autorized */ + /* run from runscript and check if commands are authorized */ ua->runscript = true; Mmsg(ua->cmd, "%s", cmd); Dmsg1(100, "Console command: %s\n", ua->cmd); parse_ua_args(ua); - ok= do_a_command(ua); + if (ua->argc > 0 && ua->argk[0][0] == '.') { + ok = do_a_dot_command(ua); + } else { + ok = do_a_command(ua); + } + close_db(ua); free_ua_context(ua); free_jcr(ljcr); return ok;