2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of John Walker.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Manipulation routines for Job Control Records and
30 * handling of last_jobs_list.
32 * Kern E. Sibbald, December 2000
36 * These routines are thread safe.
38 * The job list routines were re-written in May 2005 to
39 * eliminate the global lock while traversing the list, and
40 * to use the dlist subroutines. The locking is now done
41 * on the list each time the list is modified or traversed.
42 * That is it is "micro-locked" rather than globally locked.
43 * The result is that there is one lock/unlock for each entry
44 * in the list while traversing it rather than a single lock
45 * at the beginning of a traversal and one at the end. This
46 * incurs slightly more overhead, but effectively eliminates
47 * the possibilty of race conditions. In addition, with the
48 * exception of the global locking of the list during the
49 * re-reading of the config file, no recursion is needed.
56 const int dbglvl = 3400;
59 * Setting a NULL in tsd doesn't clear the tsd but instead tells
60 * pthreads not to call the tsd destructor. Consequently, we
61 * define this *invalid* jcr address and stuff it in the tsd
62 * when the jcr is no longer valid.
64 #define INVALID_JCR ((JCR *)(-1))
66 /* External variables we reference */
67 extern time_t watchdog_time;
69 /* External referenced functions */
70 void free_bregexps(alist *bregexps);
72 /* Forward referenced functions */
73 extern "C" void timeout_handler(int sig);
74 static void jcr_timeout_check(watchdog_t *self);
75 #ifdef TRACE_JCR_CHAIN
76 static void b_lock_jcr_chain(const char *filen, int line);
77 static void b_unlock_jcr_chain(const char *filen, int line);
78 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
79 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
81 static void lock_jcr_chain();
82 static void unlock_jcr_chain();
87 dlist *last_jobs = NULL;
88 const int max_last_jobs = 10;
90 static dlist *jcrs = NULL; /* JCR chain */
91 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
93 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
95 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
97 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
99 pthread_once_t key_once = PTHREAD_ONCE_INIT;
112 void init_last_jobs_list()
115 struct s_last_job *job_entry = NULL;
117 last_jobs = New(dlist(job_entry, &job_entry->link));
120 jcrs = New(dlist(jcr, &jcr->link));
124 void term_last_jobs_list()
127 while (!last_jobs->empty()) {
128 void *je = last_jobs->first();
129 last_jobs->remove(je);
141 bool read_last_jobs_list(int fd, uint64_t addr)
143 struct s_last_job *je, job;
146 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
147 if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
150 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
153 Dmsg1(100, "Read num_items=%d\n", num);
154 if (num > 4 * max_last_jobs) { /* sanity check */
157 for ( ; num; num--) {
158 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
160 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
164 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
165 memcpy((char *)je, (char *)&job, sizeof(job));
167 init_last_jobs_list();
169 last_jobs->append(je);
170 if (last_jobs->size() > max_last_jobs) {
171 je = (struct s_last_job *)last_jobs->first();
172 last_jobs->remove(je);
180 uint64_t write_last_jobs_list(int fd, uint64_t addr)
182 struct s_last_job *je;
185 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
186 if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
190 /* First record is number of entires */
191 num = last_jobs->size();
192 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
194 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
197 foreach_dlist(je, last_jobs) {
198 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
200 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
205 /* Return current address */
206 ssize_t stat = lseek(fd, 0, SEEK_CUR);
214 void lock_last_jobs_list()
219 void unlock_last_jobs_list()
225 * Push a subroutine address into the job end callback stack
227 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
229 jcr->job_end_push.append((void *)job_end_cb);
230 jcr->job_end_push.append(ctx);
233 /* Pop each job_end subroutine and call it */
234 static void job_end_pop(JCR *jcr)
236 void (*job_end_cb)(JCR *jcr, void *ctx);
238 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
239 ctx = jcr->job_end_push.get(i--);
240 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
241 job_end_cb(jcr, ctx);
245 void create_jcr_key()
247 int status = pthread_key_create(&jcr_key, NULL);
250 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
251 be.bstrerror(status));
256 * Create a Job Control Record and link it into JCR chain
257 * Returns newly allocated JCR
258 * Note, since each daemon has a different JCR, he passes
261 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
264 MQUEUE_ITEM *item = NULL;
265 struct sigaction sigtimer;
268 Dmsg0(dbglvl, "Enter new_jcr\n");
269 status = pthread_once(&key_once, create_jcr_key);
272 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
274 jcr = (JCR *)malloc(size);
275 memset(jcr, 0, size);
276 jcr->my_thread_id = pthread_self();
277 jcr->msg_queue = New(dlist(item, &item->link));
278 jcr->job_end_push.init(1, false);
279 jcr->sched_time = time(NULL);
280 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
282 jcr->inc_use_count();
283 jcr->VolumeName = get_pool_memory(PM_FNAME);
284 jcr->VolumeName[0] = 0;
285 jcr->errmsg = get_pool_memory(PM_MESSAGE);
287 /* Setup some dummy values */
288 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
290 jcr->JobType = JT_SYSTEM; /* internal job until defined */
291 jcr->JobLevel = L_NONE;
292 set_jcr_job_status(jcr, JS_Created); /* ready to run */
294 sigtimer.sa_flags = 0;
295 sigtimer.sa_handler = timeout_handler;
296 sigfillset(&sigtimer.sa_mask);
297 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
300 * Locking jobs is a global lock that is needed
301 * so that the Director can stop new jobs from being
302 * added to the jcr chain while it processes a new
303 * conf file and does the job_end_push().
308 jcrs = New(dlist(jcr, &jcr->link));
319 * Remove a JCR from the chain
320 * NOTE! The chain must be locked prior to calling
323 static void remove_jcr(JCR *jcr)
325 Dmsg0(dbglvl, "Enter remove_jcr\n");
327 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
330 Dmsg0(dbglvl, "Leave remove_jcr\n");
334 * Free stuff common to all JCRs. N.B. Be careful to include only
335 * generic stuff in the common part of the jcr.
337 static void free_common_jcr(JCR *jcr)
339 jcr->destroy_mutex();
341 if (jcr->msg_queue) {
342 delete jcr->msg_queue;
343 jcr->msg_queue = NULL;
345 close_msg(jcr); /* close messages for this job */
347 /* do this after closing messages */
348 if (jcr->client_name) {
349 free_pool_memory(jcr->client_name);
350 jcr->client_name = NULL;
354 free_pool_memory(jcr->attr);
358 if (jcr->sd_auth_key) {
359 free(jcr->sd_auth_key);
360 jcr->sd_auth_key = NULL;
362 if (jcr->VolumeName) {
363 free_pool_memory(jcr->VolumeName);
364 jcr->VolumeName = NULL;
367 if (jcr->dir_bsock) {
368 bnet_close(jcr->dir_bsock);
369 jcr->dir_bsock = NULL;
372 free_pool_memory(jcr->errmsg);
379 if (jcr->RegexWhere) {
380 free(jcr->RegexWhere);
381 jcr->RegexWhere = NULL;
383 if (jcr->where_bregexp) {
384 free_bregexps(jcr->where_bregexp);
385 delete jcr->where_bregexp;
386 jcr->where_bregexp = NULL;
388 if (jcr->cached_path) {
389 free_pool_memory(jcr->cached_path);
390 jcr->cached_path = NULL;
394 free_guid_list(jcr->id_list);
397 /* Invalidate the tsd jcr data */
398 set_jcr_in_tsd(INVALID_JCR);
403 * Global routine to free a jcr
406 void b_free_jcr(const char *file, int line, JCR *jcr)
408 struct s_last_job *je, last_job;
410 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
414 void free_jcr(JCR *jcr)
416 struct s_last_job *je, last_job;
418 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
419 jcr->JobId, jcr->use_count(), jcr->Job);
423 dequeue_messages(jcr);
425 jcr->dec_use_count(); /* decrement use count */
426 if (jcr->use_count() < 0) {
427 Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
428 jcr->use_count(), jcr->JobId);
430 if (jcr->JobId > 0) {
431 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
432 jcr->JobId, jcr->use_count(), jcr->Job);
434 if (jcr->use_count() > 0) { /* if in use */
438 if (jcr->JobId > 0) {
439 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
440 jcr->JobId, jcr->use_count(), jcr->Job);
442 remove_jcr(jcr); /* remove Jcr from chain */
444 job_end_pop(jcr); /* pop and call hooked routines */
446 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
448 /* Keep some statistics */
449 switch (jcr->JobType) {
457 last_job.Errors = jcr->Errors;
458 last_job.JobType = jcr->JobType;
459 last_job.JobId = jcr->JobId;
460 last_job.VolSessionId = jcr->VolSessionId;
461 last_job.VolSessionTime = jcr->VolSessionTime;
462 bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
463 last_job.JobFiles = jcr->JobFiles;
464 last_job.JobBytes = jcr->JobBytes;
465 last_job.JobStatus = jcr->JobStatus;
466 last_job.JobLevel = jcr->JobLevel;
467 last_job.start_time = jcr->start_time;
468 last_job.end_time = time(NULL);
469 /* Keep list of last jobs, but not Console where JobId==0 */
470 if (last_job.JobId > 0) {
471 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
472 memcpy((char *)je, (char *)&last_job, sizeof(last_job));
474 init_last_jobs_list();
476 last_jobs->append(je);
477 if (last_jobs->size() > max_last_jobs) {
478 je = (struct s_last_job *)last_jobs->first();
479 last_jobs->remove(je);
488 if (jcr->daemon_free_jcr) {
489 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
493 free_common_jcr(jcr);
494 close_msg(NULL); /* flush any daemon messages */
495 garbage_collect_memory_pool();
496 Dmsg0(dbglvl, "Exit free_jcr\n");
499 void set_jcr_in_tsd(JCR *jcr)
501 int status = pthread_setspecific(jcr_key, (void *)jcr);
504 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status));
508 JCR *get_jcr_from_tsd()
510 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
511 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
512 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
513 if (jcr == INVALID_JCR) {
521 * Find which JobId corresponds to the current thread
523 uint32_t get_jobid_from_tsd()
527 jcr = get_jcr_from_tsd();
528 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
530 JobId = (uint32_t)jcr->JobId;
536 * Given a JobId, find the JCR
537 * Returns: jcr on success
540 JCR *get_jcr_by_id(uint32_t JobId)
545 if (jcr->JobId == JobId) {
546 jcr->inc_use_count();
547 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
548 jcr->JobId, jcr->use_count(), jcr->Job);
557 * Given a SessionId and SessionTime, find the JCR
558 * Returns: jcr on success
561 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
566 if (jcr->VolSessionId == SessionId &&
567 jcr->VolSessionTime == SessionTime) {
568 jcr->inc_use_count();
569 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
570 jcr->JobId, jcr->use_count(), jcr->Job);
580 * Given a Job, find the JCR
581 * compares on the number of characters in Job
582 * thus allowing partial matches.
583 * Returns: jcr on success
586 JCR *get_jcr_by_partial_name(char *Job)
596 if (strncmp(Job, jcr->Job, len) == 0) {
597 jcr->inc_use_count();
598 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
599 jcr->JobId, jcr->use_count(), jcr->Job);
609 * Given a Job, find the JCR
610 * requires an exact match of names.
611 * Returns: jcr on success
614 JCR *get_jcr_by_full_name(char *Job)
622 if (strcmp(jcr->Job, Job) == 0) {
623 jcr->inc_use_count();
624 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
625 jcr->JobId, jcr->use_count(), jcr->Job);
633 void set_jcr_job_status(JCR *jcr, int JobStatus)
635 bool set_waittime=false;
636 Dmsg2(800, "set_jcr_job_status(%s, %c)\n", jcr->Job, JobStatus);
637 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
643 case JS_WaitStoreRes:
645 case JS_WaitClientRes:
647 case JS_WaitPriority:
654 * For a set of errors, ... keep the current status
655 * so it isn't lost. For all others, set it.
657 Dmsg3(300, "jid=%u OnEntry JobStatus=%c set=%c\n", (uint32_t)jcr->JobId,
658 jcr->JobStatus, JobStatus);
659 switch (jcr->JobStatus) {
660 case JS_ErrorTerminated:
667 case JS_ErrorTerminated:
670 /* Override more minor status */
671 jcr->JobStatus = JobStatus;
677 * For a set of Wait situation, keep old time.
683 case JS_WaitStoreRes:
685 case JS_WaitClientRes:
687 case JS_WaitPriority:
688 set_waittime = false; /* keep old time */
690 jcr->JobStatus = JobStatus;
692 /* set it before JobStatus */
693 Dmsg0(800, "Setting wait_time\n");
694 jcr->wait_time = time(NULL);
697 Dmsg3(100, "jid=%u leave set_jcr_job_status=%c set=%c\n", (uint32_t)jcr->JobId,
698 jcr->JobStatus, JobStatus);
701 #ifdef TRACE_JCR_CHAIN
702 static int lock_count = 0;
708 #ifdef TRACE_JCR_CHAIN
709 static void b_lock_jcr_chain(const char *fname, int line)
711 static void lock_jcr_chain()
714 #ifdef TRACE_JCR_CHAIN
715 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
723 #ifdef TRACE_JCR_CHAIN
724 static void b_unlock_jcr_chain(const char *fname, int line)
726 static void unlock_jcr_chain()
729 #ifdef TRACE_JCR_CHAIN
730 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
737 * Start walk of jcr chain
738 * The proper way to walk the jcr chain is:
745 * It is possible to leave out the endeach_jcr(jcr), but
746 * in that case, the last jcr referenced must be explicitly
752 JCR *jcr_walk_start()
756 jcr = (JCR *)jcrs->first();
758 jcr->inc_use_count();
759 if (jcr->JobId > 0) {
760 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
761 jcr->JobId, jcr->use_count(), jcr->Job);
769 * Get next jcr from chain, and release current one
771 JCR *jcr_walk_next(JCR *prev_jcr)
776 jcr = (JCR *)jcrs->next(prev_jcr);
778 jcr->inc_use_count();
779 if (jcr->JobId > 0) {
780 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
781 jcr->JobId, jcr->use_count(), jcr->Job);
792 * Release last jcr referenced
794 void jcr_walk_end(JCR *jcr)
797 if (jcr->JobId > 0) {
798 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
799 jcr->JobId, jcr->use_count(), jcr->Job);
807 * Setup to call the timeout check routine every 30 seconds
808 * This routine will check any timers that have been enabled.
810 bool init_jcr_subsystem(void)
812 watchdog_t *wd = new_watchdog();
814 wd->one_shot = false;
815 wd->interval = 30; /* FIXME: should be configurable somewhere, even
816 if only with a #define */
817 wd->callback = jcr_timeout_check;
819 register_watchdog(wd);
824 static void jcr_timeout_check(watchdog_t *self)
830 Dmsg0(dbglvl, "Start JCR timeout checks\n");
832 /* Walk through all JCRs checking if any one is
833 * blocked for more than specified max time.
836 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
837 if (jcr->JobId == 0) {
840 fd = jcr->store_bsock;
842 timer_start = fd->timer_start;
843 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
844 fd->timer_start = 0; /* turn off timer */
846 Jmsg(jcr, M_ERROR, 0, _(
847 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
848 watchdog_time - timer_start);
849 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
852 fd = jcr->file_bsock;
854 timer_start = fd->timer_start;
855 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
856 fd->timer_start = 0; /* turn off timer */
858 Jmsg(jcr, M_ERROR, 0, _(
859 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
860 watchdog_time - timer_start);
861 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
866 timer_start = fd->timer_start;
867 if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
868 fd->timer_start = 0; /* turn off timer */
870 Jmsg(jcr, M_ERROR, 0, _(
871 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
872 watchdog_time - timer_start);
873 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
879 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
883 * Timeout signal comes here
885 extern "C" void timeout_handler(int sig)
887 return; /* thus interrupting the function */