2 Bacula® - The Network Backup Solution
4 Copyright (C) 2000-2008 Free Software Foundation Europe e.V.
6 The main author of Bacula is Kern Sibbald, with contributions from
7 many others, a complete list can be found in the file AUTHORS.
8 This program is Free Software; you can redistribute it and/or
9 modify it under the terms of version two of the GNU General Public
10 License as published by the Free Software Foundation and included
13 This program is distributed in the hope that it will be useful, but
14 WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; if not, write to the Free Software
20 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
23 Bacula® is a registered trademark of Kern Sibbald.
24 The licensor of Bacula is the Free Software Foundation Europe
25 (FSFE), Fiduciary Program, Sumatrastrasse 25, 8006 Zürich,
26 Switzerland, email:ftf@fsfeurope.org.
29 * Manipulation routines for Job Control Records and
30 * handling of last_jobs_list.
32 * Kern E. Sibbald, December 2000
36 * These routines are thread safe.
38 * The job list routines were re-written in May 2005 to
39 * eliminate the global lock while traversing the list, and
40 * to use the dlist subroutines. The locking is now done
41 * on the list each time the list is modified or traversed.
42 * That is it is "micro-locked" rather than globally locked.
43 * The result is that there is one lock/unlock for each entry
44 * in the list while traversing it rather than a single lock
45 * at the beginning of a traversal and one at the end. This
46 * incurs slightly more overhead, but effectively eliminates
47 * the possibilty of race conditions. In addition, with the
48 * exception of the global locking of the list during the
49 * re-reading of the config file, no recursion is needed.
56 const int dbglvl = 3400;
59 * Setting a NULL in tsd doesn't clear the tsd but instead tells
60 * pthreads not to call the tsd destructor. Consequently, we
61 * define this *invalid* jcr address and stuff it in the tsd
62 * when the jcr is no longer valid.
64 #define INVALID_JCR ((JCR *)(-1))
66 /* External variables we reference */
67 extern time_t watchdog_time;
69 /* External referenced functions */
70 void free_bregexps(alist *bregexps);
72 /* Forward referenced functions */
73 extern "C" void timeout_handler(int sig);
74 static void jcr_timeout_check(watchdog_t *self);
75 #ifdef TRACE_JCR_CHAIN
76 static void b_lock_jcr_chain(const char *filen, int line);
77 static void b_unlock_jcr_chain(const char *filen, int line);
78 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
79 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
81 static void lock_jcr_chain();
82 static void unlock_jcr_chain();
87 dlist *last_jobs = NULL;
88 const int max_last_jobs = 10;
90 static dlist *jcrs = NULL; /* JCR chain */
91 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
93 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
95 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
97 static pthread_key_t jcr_key; /* Pointer to jcr for each thread */
99 pthread_once_t key_once = PTHREAD_ONCE_INIT;
112 void init_last_jobs_list()
115 struct s_last_job *job_entry = NULL;
117 last_jobs = New(dlist(job_entry, &job_entry->link));
120 jcrs = New(dlist(jcr, &jcr->link));
124 void term_last_jobs_list()
127 while (!last_jobs->empty()) {
128 void *je = last_jobs->first();
129 last_jobs->remove(je);
141 bool read_last_jobs_list(int fd, uint64_t addr)
143 struct s_last_job *je, job;
146 Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
147 if (addr == 0 || lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
150 if (read(fd, &num, sizeof(num)) != sizeof(num)) {
153 Dmsg1(100, "Read num_items=%d\n", num);
154 if (num > 4 * max_last_jobs) { /* sanity check */
157 for ( ; num; num--) {
158 if (read(fd, &job, sizeof(job)) != sizeof(job)) {
160 Pmsg1(000, "Read job entry. ERR=%s\n", be.bstrerror());
164 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
165 memcpy((char *)je, (char *)&job, sizeof(job));
167 init_last_jobs_list();
169 last_jobs->append(je);
170 if (last_jobs->size() > max_last_jobs) {
171 je = (struct s_last_job *)last_jobs->first();
172 last_jobs->remove(je);
180 uint64_t write_last_jobs_list(int fd, uint64_t addr)
182 struct s_last_job *je;
185 Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
186 if (lseek(fd, (boffset_t)addr, SEEK_SET) < 0) {
190 /* First record is number of entires */
191 num = last_jobs->size();
192 if (write(fd, &num, sizeof(num)) != sizeof(num)) {
194 Pmsg1(000, "Error writing num_items: ERR=%s\n", be.bstrerror());
197 foreach_dlist(je, last_jobs) {
198 if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
200 Pmsg1(000, "Error writing job: ERR=%s\n", be.bstrerror());
205 /* Return current address */
206 ssize_t stat = lseek(fd, 0, SEEK_CUR);
214 void lock_last_jobs_list()
219 void unlock_last_jobs_list()
224 /* Set Job type in JCR and also set appropriate read flag */
225 void JCR::set_JobType(int32_t JobType)
230 /* Set Job level in JCR and also set appropriate read flag */
231 void JCR::set_JobLevel(int32_t JobLevel)
233 m_JobLevel = JobLevel;
245 if (m_JobLevel == L_VIRTUAL_FULL) {
256 * Push a subroutine address into the job end callback stack
258 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
260 jcr->job_end_push.append((void *)job_end_cb);
261 jcr->job_end_push.append(ctx);
264 /* Pop each job_end subroutine and call it */
265 static void job_end_pop(JCR *jcr)
267 void (*job_end_cb)(JCR *jcr, void *ctx);
269 for (int i=jcr->job_end_push.size()-1; i > 0; ) {
270 ctx = jcr->job_end_push.get(i--);
271 job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
272 job_end_cb(jcr, ctx);
276 void create_jcr_key()
278 int status = pthread_key_create(&jcr_key, NULL);
281 Jmsg1(NULL, M_ABORT, 0, _("pthread key create failed: ERR=%s\n"),
282 be.bstrerror(status));
287 * Create a Job Control Record and link it into JCR chain
288 * Returns newly allocated JCR
289 * Note, since each daemon has a different JCR, he passes
292 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
295 MQUEUE_ITEM *item = NULL;
296 struct sigaction sigtimer;
299 Dmsg0(dbglvl, "Enter new_jcr\n");
300 status = pthread_once(&key_once, create_jcr_key);
303 Jmsg1(NULL, M_ABORT, 0, _("pthread_once failed. ERR=%s\n"), be.bstrerror(status));
305 jcr = (JCR *)malloc(size);
306 memset(jcr, 0, size);
307 jcr->my_thread_id = pthread_self();
308 jcr->msg_queue = New(dlist(item, &item->link));
309 jcr->job_end_push.init(1, false);
310 jcr->sched_time = time(NULL);
311 jcr->daemon_free_jcr = daemon_free_jcr; /* plug daemon free routine */
313 jcr->inc_use_count();
314 jcr->VolumeName = get_pool_memory(PM_FNAME);
315 jcr->VolumeName[0] = 0;
316 jcr->errmsg = get_pool_memory(PM_MESSAGE);
318 /* Setup some dummy values */
319 bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
321 jcr->set_JobType(JT_SYSTEM); /* internal job until defined */
322 jcr->set_JobLevel(L_NONE);
323 set_jcr_job_status(jcr, JS_Created); /* ready to run */
325 sigtimer.sa_flags = 0;
326 sigtimer.sa_handler = timeout_handler;
327 sigfillset(&sigtimer.sa_mask);
328 sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
331 * Locking jobs is a global lock that is needed
332 * so that the Director can stop new jobs from being
333 * added to the jcr chain while it processes a new
334 * conf file and does the job_end_push().
339 jcrs = New(dlist(jcr, &jcr->link));
350 * Remove a JCR from the chain
351 * NOTE! The chain must be locked prior to calling
354 static void remove_jcr(JCR *jcr)
356 Dmsg0(dbglvl, "Enter remove_jcr\n");
358 Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
361 Dmsg0(dbglvl, "Leave remove_jcr\n");
365 * Free stuff common to all JCRs. N.B. Be careful to include only
366 * generic stuff in the common part of the jcr.
368 static void free_common_jcr(JCR *jcr)
370 jcr->destroy_mutex();
372 if (jcr->msg_queue) {
373 delete jcr->msg_queue;
374 jcr->msg_queue = NULL;
376 close_msg(jcr); /* close messages for this job */
378 /* do this after closing messages */
379 if (jcr->client_name) {
380 free_pool_memory(jcr->client_name);
381 jcr->client_name = NULL;
385 free_pool_memory(jcr->attr);
389 if (jcr->sd_auth_key) {
390 free(jcr->sd_auth_key);
391 jcr->sd_auth_key = NULL;
393 if (jcr->VolumeName) {
394 free_pool_memory(jcr->VolumeName);
395 jcr->VolumeName = NULL;
398 if (jcr->dir_bsock) {
399 bnet_close(jcr->dir_bsock);
400 jcr->dir_bsock = NULL;
403 free_pool_memory(jcr->errmsg);
410 if (jcr->RegexWhere) {
411 free(jcr->RegexWhere);
412 jcr->RegexWhere = NULL;
414 if (jcr->where_bregexp) {
415 free_bregexps(jcr->where_bregexp);
416 delete jcr->where_bregexp;
417 jcr->where_bregexp = NULL;
419 if (jcr->cached_path) {
420 free_pool_memory(jcr->cached_path);
421 jcr->cached_path = NULL;
425 free_guid_list(jcr->id_list);
428 /* Invalidate the tsd jcr data */
429 set_jcr_in_tsd(INVALID_JCR);
434 * Global routine to free a jcr
437 void b_free_jcr(const char *file, int line, JCR *jcr)
439 struct s_last_job *je;
441 Dmsg3(dbglvl, "Enter free_jcr jid=%u from %s:%d\n", jcr->JobId, file, line);
445 void free_jcr(JCR *jcr)
447 struct s_last_job *je;
449 Dmsg3(dbglvl, "Enter free_jcr jid=%u use_count=%d Job=%s\n",
450 jcr->JobId, jcr->use_count(), jcr->Job);
454 dequeue_messages(jcr);
456 jcr->dec_use_count(); /* decrement use count */
457 if (jcr->use_count() < 0) {
458 Jmsg2(jcr, M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
459 jcr->use_count(), jcr->JobId);
461 if (jcr->JobId > 0) {
462 Dmsg3(dbglvl, "Dec free_jcr jid=%u use_count=%d Job=%s\n",
463 jcr->JobId, jcr->use_count(), jcr->Job);
465 if (jcr->use_count() > 0) { /* if in use */
469 if (jcr->JobId > 0) {
470 Dmsg3(dbglvl, "remove jcr jid=%u use_count=%d Job=%s\n",
471 jcr->JobId, jcr->use_count(), jcr->Job);
473 remove_jcr(jcr); /* remove Jcr from chain */
475 job_end_pop(jcr); /* pop and call hooked routines */
477 Dmsg1(dbglvl, "End job=%d\n", jcr->JobId);
479 /* Keep some statistics */
480 switch (jcr->get_JobType()) {
487 /* Keep list of last jobs, but not Console where JobId==0 */
488 if (jcr->JobId > 0) {
490 je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
491 memset(je, 0, sizeof(struct s_last_job)); /* zero in case unset fields */
492 je->Errors = jcr->Errors;
493 je->JobType = jcr->get_JobType();
494 je->JobId = jcr->JobId;
495 je->VolSessionId = jcr->VolSessionId;
496 je->VolSessionTime = jcr->VolSessionTime;
497 bstrncpy(je->Job, jcr->Job, sizeof(je->Job));
498 je->JobFiles = jcr->JobFiles;
499 je->JobBytes = jcr->JobBytes;
500 je->JobStatus = jcr->JobStatus;
501 je->JobLevel = jcr->get_JobLevel();
502 je->start_time = jcr->start_time;
503 je->end_time = time(NULL);
506 init_last_jobs_list();
508 last_jobs->append(je);
509 if (last_jobs->size() > max_last_jobs) {
510 je = (struct s_last_job *)last_jobs->first();
511 last_jobs->remove(je);
520 if (jcr->daemon_free_jcr) {
521 jcr->daemon_free_jcr(jcr); /* call daemon free routine */
525 free_common_jcr(jcr);
526 close_msg(NULL); /* flush any daemon messages */
527 garbage_collect_memory_pool();
528 Dmsg0(dbglvl, "Exit free_jcr\n");
531 void set_jcr_in_tsd(JCR *jcr)
533 int status = pthread_setspecific(jcr_key, (void *)jcr);
536 Jmsg1(jcr, M_ABORT, 0, _("pthread_setspecific failed: ERR=%s\n"), be.bstrerror(status));
540 JCR *get_jcr_from_tsd()
542 JCR *jcr = (JCR *)pthread_getspecific(jcr_key);
543 // printf("get_jcr_from_tsd: jcr=%p\n", jcr);
544 /* set any INVALID_JCR to NULL which the rest of Bacula understands */
545 if (jcr == INVALID_JCR) {
553 * Find which JobId corresponds to the current thread
555 uint32_t get_jobid_from_tsd()
559 jcr = get_jcr_from_tsd();
560 // printf("get_jobid_from_tsr: jcr=%p\n", jcr);
562 JobId = (uint32_t)jcr->JobId;
568 * Given a JobId, find the JCR
569 * Returns: jcr on success
572 JCR *get_jcr_by_id(uint32_t JobId)
577 if (jcr->JobId == JobId) {
578 jcr->inc_use_count();
579 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
580 jcr->JobId, jcr->use_count(), jcr->Job);
589 * Given a SessionId and SessionTime, find the JCR
590 * Returns: jcr on success
593 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
598 if (jcr->VolSessionId == SessionId &&
599 jcr->VolSessionTime == SessionTime) {
600 jcr->inc_use_count();
601 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
602 jcr->JobId, jcr->use_count(), jcr->Job);
612 * Given a Job, find the JCR
613 * compares on the number of characters in Job
614 * thus allowing partial matches.
615 * Returns: jcr on success
618 JCR *get_jcr_by_partial_name(char *Job)
628 if (strncmp(Job, jcr->Job, len) == 0) {
629 jcr->inc_use_count();
630 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
631 jcr->JobId, jcr->use_count(), jcr->Job);
641 * Given a Job, find the JCR
642 * requires an exact match of names.
643 * Returns: jcr on success
646 JCR *get_jcr_by_full_name(char *Job)
654 if (strcmp(jcr->Job, Job) == 0) {
655 jcr->inc_use_count();
656 Dmsg3(dbglvl, "Inc get_jcr jid=%u use_count=%d Job=%s\n",
657 jcr->JobId, jcr->use_count(), jcr->Job);
665 void set_jcr_job_status(JCR *jcr, int JobStatus)
667 bool set_waittime=false;
668 Dmsg2(800, "set_jcr_job_status(%s, %c)\n", jcr->Job, JobStatus);
669 /* if wait state is new, we keep current time for watchdog MaxWaitTime */
675 case JS_WaitStoreRes:
677 case JS_WaitClientRes:
679 case JS_WaitPriority:
686 * For a set of errors, ... keep the current status
687 * so it isn't lost. For all others, set it.
689 Dmsg3(300, "jid=%u OnEntry JobStatus=%c set=%c\n", (uint32_t)jcr->JobId,
690 jcr->JobStatus, JobStatus);
691 switch (jcr->JobStatus) {
692 case JS_ErrorTerminated:
699 case JS_ErrorTerminated:
702 /* Override more minor status */
703 jcr->JobStatus = JobStatus;
709 * For a set of Wait situation, keep old time.
715 case JS_WaitStoreRes:
717 case JS_WaitClientRes:
719 case JS_WaitPriority:
720 set_waittime = false; /* keep old time */
722 jcr->JobStatus = JobStatus;
724 /* set it before JobStatus */
725 Dmsg0(800, "Setting wait_time\n");
726 jcr->wait_time = time(NULL);
729 Dmsg3(200, "jid=%u leave set_jcr_job_status=%c set=%c\n", (uint32_t)jcr->JobId,
730 jcr->JobStatus, JobStatus);
733 #ifdef TRACE_JCR_CHAIN
734 static int lock_count = 0;
740 #ifdef TRACE_JCR_CHAIN
741 static void b_lock_jcr_chain(const char *fname, int line)
743 static void lock_jcr_chain()
746 #ifdef TRACE_JCR_CHAIN
747 Dmsg3(dbglvl, "Lock jcr chain %d from %s:%d\n", ++lock_count, fname, line);
755 #ifdef TRACE_JCR_CHAIN
756 static void b_unlock_jcr_chain(const char *fname, int line)
758 static void unlock_jcr_chain()
761 #ifdef TRACE_JCR_CHAIN
762 Dmsg3(dbglvl, "Unlock jcr chain %d from %s:%d\n", lock_count--, fname, line);
769 * Start walk of jcr chain
770 * The proper way to walk the jcr chain is:
777 * It is possible to leave out the endeach_jcr(jcr), but
778 * in that case, the last jcr referenced must be explicitly
784 JCR *jcr_walk_start()
788 jcr = (JCR *)jcrs->first();
790 jcr->inc_use_count();
791 if (jcr->JobId > 0) {
792 Dmsg3(dbglvl, "Inc walk_start jid=%u use_count=%d Job=%s\n",
793 jcr->JobId, jcr->use_count(), jcr->Job);
801 * Get next jcr from chain, and release current one
803 JCR *jcr_walk_next(JCR *prev_jcr)
808 jcr = (JCR *)jcrs->next(prev_jcr);
810 jcr->inc_use_count();
811 if (jcr->JobId > 0) {
812 Dmsg3(dbglvl, "Inc walk_next jid=%u use_count=%d Job=%s\n",
813 jcr->JobId, jcr->use_count(), jcr->Job);
824 * Release last jcr referenced
826 void jcr_walk_end(JCR *jcr)
829 if (jcr->JobId > 0) {
830 Dmsg3(dbglvl, "Free walk_end jid=%u use_count=%d Job=%s\n",
831 jcr->JobId, jcr->use_count(), jcr->Job);
839 * Setup to call the timeout check routine every 30 seconds
840 * This routine will check any timers that have been enabled.
842 bool init_jcr_subsystem(void)
844 watchdog_t *wd = new_watchdog();
846 wd->one_shot = false;
847 wd->interval = 30; /* FIXME: should be configurable somewhere, even
848 if only with a #define */
849 wd->callback = jcr_timeout_check;
851 register_watchdog(wd);
856 static void jcr_timeout_check(watchdog_t *self)
862 Dmsg0(dbglvl, "Start JCR timeout checks\n");
864 /* Walk through all JCRs checking if any one is
865 * blocked for more than specified max time.
868 Dmsg2(dbglvl, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
869 if (jcr->JobId == 0) {
872 bs = jcr->store_bsock;
874 timer_start = bs->timer_start;
875 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
876 bs->timer_start = 0; /* turn off timer */
878 Qmsg(jcr, M_ERROR, 0, _(
879 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
880 watchdog_time - timer_start);
881 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
884 bs = jcr->file_bsock;
886 timer_start = bs->timer_start;
887 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
888 bs->timer_start = 0; /* turn off timer */
890 Qmsg(jcr, M_ERROR, 0, _(
891 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
892 watchdog_time - timer_start);
893 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
898 timer_start = bs->timer_start;
899 if (timer_start && (watchdog_time - timer_start) > bs->timeout) {
900 bs->timer_start = 0; /* turn off timer */
902 Qmsg(jcr, M_ERROR, 0, _(
903 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
904 watchdog_time - timer_start);
905 pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
911 Dmsg0(dbglvl, "Finished JCR timeout checks\n");
915 * Timeout signal comes here
917 extern "C" void timeout_handler(int sig)
919 return; /* thus interrupting the function */