2 * Manipulation routines for Job Control Records and
3 * handling of last_jobs_list.
5 * Kern E. Sibbald, December 2000
9 * These routines are thread safe.
11 * The job list routines were re-written in May 2005 to
12 * eliminate the global lock while traversing the list, and
13 * to use the dlist subroutines. The locking is now done
14 * on the list each time the list is modified or traversed.
15 * That is it is "micro-locked" rather than globally locked.
16 * The result is that there is one lock/unlock for each entry
17 * in the list while traversing it rather than a single lock
18 * at the beginning of a traversal and one at the end. This
19 * incurs slightly more overhead, but effectively eliminates
20 * the possibilty of race conditions. In addition, with the
21 * exception of the global locking of the list during the
22 * re-reading of the config file, no recursion is needed.
26 Copyright (C) 2000-2005 Kern Sibbald
28 This program is free software; you can redistribute it and/or
29 modify it under the terms of the GNU General Public License
30 version 2 as amended with additional clauses defined in the
31 file LICENSE in the main source directory.
33 This program is distributed in the hope that it will be useful,
34 but WITHOUT ANY WARRANTY; without even the implied warranty of
35 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
36 the file LICENSE for additional details.
43 /* External variables we reference */
44 extern time_t watchdog_time;
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
64 static dlist *jcrs = NULL; /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
79 void init_last_jobs_list()
82 struct s_last_job *job_entry = NULL;
84 last_jobs = New(dlist(job_entry, &job_entry->link));
87 jcrs = New(dlist(jcr, &jcr->link));
91 void term_last_jobs_list()
94 while (!last_jobs->empty()) {
95 void *je = last_jobs->first();
96 last_jobs->remove(je);
108 void read_last_jobs_list(int fd, uint64_t addr)
110 struct s_last_job *je, job;
113 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
114 if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
117 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
120 Dmsg1(100, "Read num_items=%d\n", num);
121 if (num > 4 * max_last_jobs) { /* sanity check */
124 for ( ; num; num--) {
125 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
126 Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
130 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
131 memcpy((char *)je, (char *)&job, sizeof(job));
133 init_last_jobs_list();
135 last_jobs->append(je);
136 if (last_jobs->size() > max_last_jobs) {
137 je = (struct s_last_job *)last_jobs->first();
138 last_jobs->remove(je);
145 uint64_t write_last_jobs_list(int fd, uint64_t addr)
147 struct s_last_job *je;
150 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
151 if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
155 /* First record is number of entires */
156 num = last_jobs->size();
157 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
158 Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
161 foreach_dlist(je, last_jobs) {
162 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
163 Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
168 /* Return current address */
169 ssize_t stat = lseek(fd, 0, SEEK_CUR);
177 void lock_last_jobs_list()
179 /* Use jcr chain mutex */
183 void unlock_last_jobs_list()
185 /* Use jcr chain mutex */
190 * Push a subroutine address into the job end callback stack
192 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
194 jcr->job_end_push.append((void *)job_end_cb);
195 jcr->job_end_push.append(ctx);
198 /* Pop each job_end subroutine and call it */
199 static void job_end_pop(JCR *jcr)
201 void (*job_end_cb)(JCR *jcr, void *ctx);
203 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
204 ctx = jcr->job_end_push.get(i--);
205 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
206 job_end_cb(jcr, ctx);
211 * Create a Job Control Record and link it into JCR chain
212 * Returns newly allocated JCR
213 * Note, since each daemon has a different JCR, he passes
216 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
219 MQUEUE_ITEM *item = NULL;
220 struct sigaction sigtimer;
222 Dmsg0(3400, "Enter new_jcr\n");
223 jcr = (JCR *)malloc(size);
224 memset(jcr, 0, size);
225 jcr->my_thread_id = pthread_self();
226 jcr->msg_queue = New(dlist(item, &item->link));
227 jcr->job_end_push.init(1, false);
228 jcr->sched_time = time(NULL);
229 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
231 pthread_mutex_init(&(jcr->mutex), NULL);
232 jcr->JobStatus = JS_Created; /* ready to run */
233 jcr->VolumeName = get_pool_memory(PM_FNAME);
234 jcr->VolumeName[0] = 0;
235 jcr->errmsg = get_pool_memory(PM_MESSAGE);
237 /* Setup some dummy values */
238 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
240 jcr->JobType = JT_SYSTEM; /* internal job until defined */
241 jcr->JobLevel = L_NONE;
242 jcr->JobStatus = JS_Created;
244 sigtimer.sa_flags = 0;
245 sigtimer.sa_handler = timeout_handler;
246 sigfillset(&sigtimer.sa_mask);
247 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
250 * Locking jobs is a global lock that is needed
251 * so that the Director can stop new jobs from being
252 * added to the jcr chain while it processes a new
253 * conf file and does the job_end_push().
258 jcrs = New(dlist(jcr, &jcr->link));
269 * Remove a JCR from the chain
270 * NOTE! The chain must be locked prior to calling
273 static void remove_jcr(JCR *jcr)
275 Dmsg0(3400, "Enter remove_jcr\n");
277 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
280 Dmsg0(3400, "Leave remove_jcr\n");
284 * Free stuff common to all JCRs. N.B. Be careful to include only
285 * generic stuff in the common part of the jcr.
287 static void free_common_jcr(JCR *jcr)
289 struct s_last_job *je, last_job;
291 /* Keep some statistics */
292 switch (jcr->JobType) {
300 last_job.Errors = jcr->Errors;
301 last_job.JobType = jcr->JobType;
302 last_job.JobId = jcr->JobId;
303 last_job.VolSessionId = jcr->VolSessionId;
304 last_job.VolSessionTime = jcr->VolSessionTime;
305 bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
306 last_job.JobFiles = jcr->JobFiles;
307 last_job.JobBytes = jcr->JobBytes;
308 last_job.JobStatus = jcr->JobStatus;
309 last_job.JobLevel = jcr->JobLevel;
310 last_job.start_time = jcr->start_time;
311 last_job.end_time = time(NULL);
312 /* Keep list of last jobs, but not Console where JobId==0 */
313 if (last_job.JobId > 0) {
314 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
315 memcpy((char *)je, (char *)&last_job, sizeof(last_job));
317 init_last_jobs_list();
319 last_jobs->append(je);
320 if (last_jobs->size() > max_last_jobs) {
321 je = (struct s_last_job *)last_jobs->first();
322 last_jobs->remove(je);
330 pthread_mutex_destroy(&jcr->mutex);
332 delete jcr->msg_queue;
333 close_msg(jcr); /* close messages for this job */
335 /* do this after closing messages */
336 if (jcr->client_name) {
337 free_pool_memory(jcr->client_name);
338 jcr->client_name = NULL;
342 free_pool_memory(jcr->attr);
346 if (jcr->sd_auth_key) {
347 free(jcr->sd_auth_key);
348 jcr->sd_auth_key = NULL;
350 if (jcr->VolumeName) {
351 free_pool_memory(jcr->VolumeName);
352 jcr->VolumeName = NULL;
355 if (jcr->dir_bsock) {
356 bnet_close(jcr->dir_bsock);
357 jcr->dir_bsock = NULL;
360 free_pool_memory(jcr->errmsg);
367 if (jcr->cached_path) {
368 free_pool_memory(jcr->cached_path);
369 jcr->cached_path = NULL;
372 free_getuser_cache();
373 free_getgroup_cache();
378 * Global routine to free a jcr
381 void b_free_jcr(const char *file, int line, JCR *jcr)
383 Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
387 void free_jcr(JCR *jcr)
390 Dmsg1(3400, "Enter free_jcr 0x%x\n", jcr);
394 dequeue_messages(jcr);
396 jcr->dec_use_count(); /* decrement use count */
397 if (jcr->use_count < 0) {
398 Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
399 jcr->use_count, jcr->JobId);
401 Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
402 if (jcr->use_count > 0) { /* if in use */
404 Dmsg2(3400, "free_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
408 remove_jcr(jcr); /* remove Jcr from chain */
411 job_end_pop(jcr); /* pop and call hooked routines */
413 Dmsg1(3400, "End job=%d\n", jcr->JobId);
414 if (jcr->daemon_free_jcr) {
415 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
417 free_common_jcr(jcr);
418 close_msg(NULL); /* flush any daemon messages */
419 Dmsg0(3400, "Exit free_jcr\n");
424 * Given a JobId, find the JCR
425 * Returns: jcr on success
428 JCR *get_jcr_by_id(uint32_t JobId)
432 lock_jcr_chain(); /* lock chain */
433 foreach_dlist(jcr, jcrs) {
434 if (jcr->JobId == JobId) {
435 jcr->inc_use_count();
436 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
445 * Given a SessionId and SessionTime, find the JCR
446 * Returns: jcr on success
449 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
454 foreach_dlist(jcr, jcrs) {
455 if (jcr->VolSessionId == SessionId &&
456 jcr->VolSessionTime == SessionTime) {
457 jcr->inc_use_count();
458 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
468 * Given a Job, find the JCR
469 * compares on the number of characters in Job
470 * thus allowing partial matches.
471 * Returns: jcr on success
474 JCR *get_jcr_by_partial_name(char *Job)
484 foreach_dlist(jcr, jcrs) {
485 if (strncmp(Job, jcr->Job, len) == 0) {
486 jcr->inc_use_count();
487 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
497 * Given a Job, find the JCR
498 * requires an exact match of names.
499 * Returns: jcr on success
502 JCR *get_jcr_by_full_name(char *Job)
510 foreach_dlist(jcr, jcrs) {
511 if (strcmp(jcr->Job, Job) == 0) {
512 jcr->inc_use_count();
513 Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
521 void set_jcr_job_status(JCR *jcr, int JobStatus)
524 * For a set of errors, ... keep the current status
525 * so it isn't lost. For all others, set it.
527 switch (jcr->JobStatus) {
528 case JS_ErrorTerminated:
535 jcr->JobStatus = JobStatus;
539 #ifdef TRACE_JCR_CHAIN
540 static int lock_count = 0;
546 #ifdef TRACE_JCR_CHAIN
547 static void b_lock_jcr_chain(const char *fname, int line)
549 static void lock_jcr_chain()
552 #ifdef TRACE_JCR_CHAIN
553 Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
562 #ifdef TRACE_JCR_CHAIN
563 static void b_unlock_jcr_chain(const char *fname, int line)
565 static void unlock_jcr_chain()
568 #ifdef TRACE_JCR_CHAIN
569 Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
576 JCR *get_next_jcr(JCR *prev_jcr)
581 jcr = (JCR *)jcrs->next(prev_jcr);
583 jcr->inc_use_count();
584 Dmsg2(3400, "Inc get_next_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
590 bool init_jcr_subsystem(void)
592 watchdog_t *wd = new_watchdog();
594 wd->one_shot = false;
595 wd->interval = 30; /* FIXME: should be configurable somewhere, even
596 if only with a #define */
597 wd->callback = jcr_timeout_check;
599 register_watchdog(wd);
604 static void jcr_timeout_check(watchdog_t *self)
610 Dmsg0(3400, "Start JCR timeout checks\n");
612 /* Walk through all JCRs checking if any one is
613 * blocked for more than specified max time.
616 if (jcr->JobId == 0) {
620 fd = jcr->store_bsock;
622 timer_start = fd->timer_start;
623 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
624 fd->timer_start = 0; /* turn off timer */
625 fd->timed_out = TRUE;
626 Jmsg(jcr, M_ERROR, 0, _(
627 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
628 watchdog_time - timer_start);
629 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
632 fd = jcr->file_bsock;
634 timer_start = fd->timer_start;
635 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
636 fd->timer_start = 0; /* turn off timer */
637 fd->timed_out = TRUE;
638 Jmsg(jcr, M_ERROR, 0, _(
639 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
640 watchdog_time - timer_start);
641 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
646 timer_start = fd->timer_start;
647 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
648 fd->timer_start = 0; /* turn off timer */
649 fd->timed_out = TRUE;
650 Jmsg(jcr, M_ERROR, 0, _(
651 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
652 watchdog_time - timer_start);
653 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
659 Dmsg0(3400, "Finished JCR timeout checks\n");
663 * Timeout signal comes here
665 extern "C" void timeout_handler(int sig)
667 return; /* thus interrupting the function */