]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
kes Attempt to fix problems with the msg_queue crashing on Solaris
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2  * Manipulation routines for Job Control Records and
3  *  handling of last_jobs_list.
4  *
5  *  Kern E. Sibbald, December 2000
6  *
7  *  Version $Id$
8  *
9  *  These routines are thread safe.
10  *
11  *  The job list routines were re-written in May 2005 to
12  *  eliminate the global lock while traversing the list, and
13  *  to use the dlist subroutines.  The locking is now done
14  *  on the list each time the list is modified or traversed.
15  *  That is it is "micro-locked" rather than globally locked.
16  *  The result is that there is one lock/unlock for each entry
17  *  in the list while traversing it rather than a single lock
18  *  at the beginning of a traversal and one at the end.  This
19  *  incurs slightly more overhead, but effectively eliminates 
20  *  the possibilty of race conditions.  In addition, with the
21  *  exception of the global locking of the list during the
22  *  re-reading of the config file, no recursion is needed.
23  *
24  */
25 /*
26    Copyright (C) 2000-2006 Kern Sibbald
27
28    This program is free software; you can redistribute it and/or
29    modify it under the terms of the GNU General Public License
30    version 2 as amended with additional clauses defined in the
31    file LICENSE in the main source directory.
32
33    This program is distributed in the hope that it will be useful,
34    but WITHOUT ANY WARRANTY; without even the implied warranty of
35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
36    the file LICENSE for additional details.
37
38  */
39
40 #include "bacula.h"
41 #include "jcr.h"
42
43 /* External variables we reference */
44 extern time_t watchdog_time;
45
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
54 #else
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
57 #endif
58
59
60 int num_jobs_run;
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
63  
64 static dlist *jcrs = NULL;            /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
68
69 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
70
71
72 void lock_jobs()
73 {
74    P(job_start_mutex);
75 }
76
77 void unlock_jobs()
78 {
79    V(job_start_mutex);
80 }
81
82 void init_last_jobs_list()
83 {
84    JCR *jcr = NULL;
85    struct s_last_job *job_entry = NULL;
86    if (!last_jobs) {
87       last_jobs = New(dlist(job_entry, &job_entry->link));
88    }
89    if (!jcrs) {
90       jcrs = New(dlist(jcr, &jcr->link));
91    }
92 }
93
94 void term_last_jobs_list()
95 {
96    if (last_jobs) {
97       while (!last_jobs->empty()) {
98          void *je = last_jobs->first();
99          last_jobs->remove(je);
100          free(je);
101       }
102       delete last_jobs;
103       last_jobs = NULL;
104    }
105    if (jcrs) {
106       delete jcrs;
107       jcrs = NULL;
108    }
109 }
110
111 bool read_last_jobs_list(int fd, uint64_t addr)
112 {
113    struct s_last_job *je, job;
114    uint32_t num;
115
116    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
117    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
118       return false;
119    }
120    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
121       return false;
122    }
123    Dmsg1(100, "Read num_items=%d\n", num);
124    if (num > 4 * max_last_jobs) {  /* sanity check */
125       return false;
126    }
127    for ( ; num; num--) {
128       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
129          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
130          return false;
131       }
132       if (job.JobId > 0) {
133          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
134          memcpy((char *)je, (char *)&job, sizeof(job));
135          if (!last_jobs) {
136             init_last_jobs_list();
137          }
138          last_jobs->append(je);
139          if (last_jobs->size() > max_last_jobs) {
140             je = (struct s_last_job *)last_jobs->first();
141             last_jobs->remove(je);
142             free(je);
143          }
144       }
145    }
146    return true;
147 }
148
149 uint64_t write_last_jobs_list(int fd, uint64_t addr)
150 {
151    struct s_last_job *je;
152    uint32_t num;
153
154    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
155    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
156       return 0;
157    }
158    if (last_jobs) {
159       /* First record is number of entires */
160       num = last_jobs->size();
161       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
162          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
163          return 0;
164       }
165       foreach_dlist(je, last_jobs) {
166          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
167             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
168             return 0;
169          }
170       }
171    }
172    /* Return current address */
173    ssize_t stat = lseek(fd, 0, SEEK_CUR);
174    if (stat < 0) {
175       stat = 0;
176    }
177    return stat;
178
179 }
180
181 void lock_last_jobs_list()
182 {
183    P(last_jobs_mutex);
184 }
185
186 void unlock_last_jobs_list()
187 {
188    V(last_jobs_mutex);
189 }
190
191 /*
192  * Push a subroutine address into the job end callback stack
193  */
194 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
195 {
196    jcr->job_end_push.append((void *)job_end_cb);
197    jcr->job_end_push.append(ctx);
198 }
199
200 /* Pop each job_end subroutine and call it */
201 static void job_end_pop(JCR *jcr)
202 {
203    void (*job_end_cb)(JCR *jcr, void *ctx);
204    void *ctx;
205    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
206       ctx = jcr->job_end_push.get(i--);
207       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
208       job_end_cb(jcr, ctx);
209    }
210 }
211
212 /*
213  * Create a Job Control Record and link it into JCR chain
214  * Returns newly allocated JCR
215  * Note, since each daemon has a different JCR, he passes
216  *  us the size.
217  */
218 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
219 {
220    JCR *jcr;
221    MQUEUE_ITEM *item = NULL;
222    struct sigaction sigtimer;
223
224    Dmsg0(3400, "Enter new_jcr\n");
225    jcr = (JCR *)malloc(size);
226    memset(jcr, 0, size);
227    jcr->my_thread_id = pthread_self();
228    jcr->msg_queue = New(dlist(item, &item->link));
229    jcr->job_end_push.init(1, false);
230    jcr->sched_time = time(NULL);
231    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
232    jcr->init_mutex();
233    jcr->inc_use_count();   
234    jcr->VolumeName = get_pool_memory(PM_FNAME);
235    jcr->VolumeName[0] = 0;
236    jcr->errmsg = get_pool_memory(PM_MESSAGE);
237    jcr->errmsg[0] = 0;
238    /* Setup some dummy values */
239    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
240    jcr->JobId = 0;
241    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
242    jcr->JobLevel = L_NONE;
243    set_jcr_job_status(jcr, JS_Created);       /* ready to run */
244
245    sigtimer.sa_flags = 0;
246    sigtimer.sa_handler = timeout_handler;
247    sigfillset(&sigtimer.sa_mask);
248    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
249
250    /*
251     * Locking jobs is a global lock that is needed
252     * so that the Director can stop new jobs from being
253     * added to the jcr chain while it processes a new
254     * conf file and does the job_end_push().
255     */
256    lock_jobs();
257    lock_jcr_chain();
258    if (!jcrs) {
259       jcrs = New(dlist(jcr, &jcr->link));
260    }
261    jcrs->append(jcr);
262    unlock_jcr_chain();
263    unlock_jobs();
264
265    return jcr;
266 }
267
268
269 /*
270  * Remove a JCR from the chain
271  * NOTE! The chain must be locked prior to calling
272  *       this routine.
273  */
274 static void remove_jcr(JCR *jcr)
275 {
276    Dmsg0(3400, "Enter remove_jcr\n");
277    if (!jcr) {
278       Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
279    }
280    jcrs->remove(jcr);
281    Dmsg0(3400, "Leave remove_jcr\n");
282 }
283
284 /*
285  * Free stuff common to all JCRs.  N.B. Be careful to include only
286  *  generic stuff in the common part of the jcr.
287  */
288 static void free_common_jcr(JCR *jcr)
289 {
290    struct s_last_job *je, last_job;
291
292    /* Keep some statistics */
293    switch (jcr->JobType) {
294    case JT_BACKUP:
295    case JT_VERIFY:
296    case JT_RESTORE:
297    case JT_MIGRATE:
298    case JT_COPY:
299    case JT_ADMIN:
300       num_jobs_run++;
301       last_job.Errors = jcr->Errors;
302       last_job.JobType = jcr->JobType;
303       last_job.JobId = jcr->JobId;
304       last_job.VolSessionId = jcr->VolSessionId;
305       last_job.VolSessionTime = jcr->VolSessionTime;
306       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
307       last_job.JobFiles = jcr->JobFiles;
308       last_job.JobBytes = jcr->JobBytes;
309       last_job.JobStatus = jcr->JobStatus;
310       last_job.JobLevel = jcr->JobLevel;
311       last_job.start_time = jcr->start_time;
312       last_job.end_time = time(NULL);
313       /* Keep list of last jobs, but not Console where JobId==0 */
314       if (last_job.JobId > 0) {
315          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
316          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
317          if (!last_jobs) {
318             init_last_jobs_list();
319          }
320          last_jobs->append(je);
321          if (last_jobs->size() > max_last_jobs) {
322             je = (struct s_last_job *)last_jobs->first();
323             last_jobs->remove(je);
324             free(je);
325          }
326       }
327       break;
328    default:
329       break;
330    }
331    jcr->destroy_mutex();
332
333    if (jcr->msg_queue) {
334       delete jcr->msg_queue;
335       jcr->msg_queue = NULL;
336    }
337    close_msg(jcr);                    /* close messages for this job */
338
339    /* do this after closing messages */
340    if (jcr->client_name) {
341       free_pool_memory(jcr->client_name);
342       jcr->client_name = NULL;
343    }
344
345    if (jcr->attr) {
346       free_pool_memory(jcr->attr);
347       jcr->attr = NULL;
348    }
349
350    if (jcr->sd_auth_key) {
351       free(jcr->sd_auth_key);
352       jcr->sd_auth_key = NULL;
353    }
354    if (jcr->VolumeName) {
355       free_pool_memory(jcr->VolumeName);
356       jcr->VolumeName = NULL;
357    }
358
359    if (jcr->dir_bsock) {
360       bnet_close(jcr->dir_bsock);
361       jcr->dir_bsock = NULL;
362    }
363    if (jcr->errmsg) {
364       free_pool_memory(jcr->errmsg);
365       jcr->errmsg = NULL;
366    }
367    if (jcr->where) {
368       free(jcr->where);
369       jcr->where = NULL;
370    }
371    if (jcr->cached_path) {
372       free_pool_memory(jcr->cached_path);
373       jcr->cached_path = NULL;
374       jcr->cached_pnl = 0;
375    }
376    free_getuser_cache();
377    free_getgroup_cache();
378    free(jcr);
379 }
380
381 /*
382  * Global routine to free a jcr
383  */
384 #ifdef DEBUG
385 void b_free_jcr(const char *file, int line, JCR *jcr)
386 {
387    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
388
389 #else
390
391 void free_jcr(JCR *jcr)
392 {
393
394    Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
395
396 #endif
397
398    dequeue_messages(jcr);
399    lock_jcr_chain();
400    jcr->dec_use_count();              /* decrement use count */
401    if (jcr->use_count() < 0) {
402       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
403          jcr->use_count(), jcr->JobId);
404    }
405    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count(), jcr->JobId);
406    if (jcr->use_count() > 0) {          /* if in use */
407       unlock_jcr_chain();
408       Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count());
409       return;
410    }
411
412    remove_jcr(jcr);                   /* remove Jcr from chain */
413    unlock_jcr_chain();
414
415    job_end_pop(jcr);                  /* pop and call hooked routines */
416
417    Dmsg1(3400, "End job=%d\n", jcr->JobId);
418    if (jcr->daemon_free_jcr) {
419       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
420    }
421    free_common_jcr(jcr);
422    close_msg(NULL);                   /* flush any daemon messages */
423    garbage_collect_memory_pool();
424    Dmsg0(3400, "Exit free_jcr\n");
425 }
426
427
428 /*
429  * Given a JobId, find the JCR
430  *   Returns: jcr on success
431  *            NULL on failure
432  */
433 JCR *get_jcr_by_id(uint32_t JobId)
434 {
435    JCR *jcr;
436
437    foreach_jcr(jcr) {
438       if (jcr->JobId == JobId) {
439          jcr->inc_use_count();
440          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
441          break;
442       }
443    }
444    endeach_jcr(jcr);
445    return jcr;
446 }
447
448 /*
449  * Given a SessionId and SessionTime, find the JCR
450  *   Returns: jcr on success
451  *            NULL on failure
452  */
453 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
454 {
455    JCR *jcr;
456
457    foreach_jcr(jcr) {
458       if (jcr->VolSessionId == SessionId &&
459           jcr->VolSessionTime == SessionTime) {
460          jcr->inc_use_count();
461          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
462          break;
463       }
464    }
465    endeach_jcr(jcr);
466    return jcr;
467 }
468
469
470 /*
471  * Given a Job, find the JCR
472  *  compares on the number of characters in Job
473  *  thus allowing partial matches.
474  *   Returns: jcr on success
475  *            NULL on failure
476  */
477 JCR *get_jcr_by_partial_name(char *Job)
478 {
479    JCR *jcr;
480    int len;
481
482    if (!Job) {
483       return NULL;
484    }
485    len = strlen(Job);
486    foreach_jcr(jcr) {
487       if (strncmp(Job, jcr->Job, len) == 0) {
488          jcr->inc_use_count();
489          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
490          break;
491       }
492    }
493    endeach_jcr(jcr);
494    return jcr;
495 }
496
497
498 /*
499  * Given a Job, find the JCR
500  *  requires an exact match of names.
501  *   Returns: jcr on success
502  *            NULL on failure
503  */
504 JCR *get_jcr_by_full_name(char *Job)
505 {
506    JCR *jcr;
507
508    if (!Job) {
509       return NULL;
510    }
511    foreach_jcr(jcr) {
512       if (strcmp(jcr->Job, Job) == 0) {
513          jcr->inc_use_count();
514          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
515          break;
516       }
517    }
518    endeach_jcr(jcr);
519    return jcr;
520 }
521
522 void set_jcr_job_status(JCR *jcr, int JobStatus)
523 {
524    /*
525     * For a set of errors, ... keep the current status
526     *   so it isn't lost. For all others, set it.
527     */
528    switch (jcr->JobStatus) {
529    case JS_ErrorTerminated:
530    case JS_Error:
531    case JS_FatalError:
532    case JS_Differences:
533    case JS_Canceled:
534       break;
535    default:
536       jcr->JobStatus = JobStatus;
537    }
538 }
539
540 #ifdef TRACE_JCR_CHAIN
541 static int lock_count = 0;
542 #endif
543
544 /*
545  * Lock the chain
546  */
547 #ifdef TRACE_JCR_CHAIN
548 static void b_lock_jcr_chain(const char *fname, int line)
549 #else
550 static void lock_jcr_chain()
551 #endif
552 {
553 #ifdef TRACE_JCR_CHAIN
554    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
555       fname, line);
556 #endif
557    P(jcr_lock);
558 }
559
560 /*
561  * Unlock the chain
562  */
563 #ifdef TRACE_JCR_CHAIN
564 static void b_unlock_jcr_chain(const char *fname, int line)
565 #else
566 static void unlock_jcr_chain()
567 #endif
568 {
569 #ifdef TRACE_JCR_CHAIN
570    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
571       fname, line);
572 #endif
573    V(jcr_lock);
574 }
575
576
577 /*
578  * Start walk of jcr chain
579  * The proper way to walk the jcr chain is:
580  *    JCR *jcr;
581  *    foreach_jcr(jcr) {
582  *      ...
583  *    }
584  *    endeach_jcr(jcr);
585  *
586  *  It is possible to leave out the endeach_jcr(jcr), but
587  *   in that case, the last jcr referenced must be explicitly
588  *   released with:
589  *
590  *    free_jcr(jcr);
591  *  
592  */
593 JCR *jcr_walk_start() 
594 {
595    JCR *jcr;
596    lock_jcr_chain();
597    jcr = (JCR *)jcrs->first();
598    if (jcr) {
599       jcr->inc_use_count();
600       Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr, 
601             jcr->JobId, jcr->use_count());
602    }
603    unlock_jcr_chain();
604    return jcr;
605 }
606
607 /*
608  * Get next jcr from chain, and release current one
609  */
610 JCR *jcr_walk_next(JCR *prev_jcr)
611 {
612    JCR *jcr;
613
614    lock_jcr_chain();
615    jcr = (JCR *)jcrs->next(prev_jcr);
616    if (jcr) {
617       jcr->inc_use_count();
618       Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr, 
619          jcr->JobId, jcr->use_count());
620    }
621    unlock_jcr_chain();
622    if (prev_jcr) {
623       free_jcr(prev_jcr);
624    }
625    return jcr;
626 }
627
628 /*
629  * Release last jcr referenced
630  */
631 void jcr_walk_end(JCR *jcr)
632 {
633    if (jcr) {
634       free_jcr(jcr);
635    }
636 }
637
638
639 /*
640  * Setup to call the timeout check routine every 30 seconds
641  *  This routine will check any timers that have been enabled.
642  */
643 bool init_jcr_subsystem(void)
644 {
645    watchdog_t *wd = new_watchdog();
646
647    wd->one_shot = false;
648    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
649                          if only with a #define */
650    wd->callback = jcr_timeout_check;
651
652    register_watchdog(wd);
653
654    return true;
655 }
656
657 static void jcr_timeout_check(watchdog_t *self)
658 {
659    JCR *jcr;
660    BSOCK *fd;
661    time_t timer_start;
662
663    Dmsg0(3400, "Start JCR timeout checks\n");
664
665    /* Walk through all JCRs checking if any one is
666     * blocked for more than specified max time.
667     */
668    foreach_jcr(jcr) {
669       Dmsg2(3400, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
670       if (jcr->JobId == 0) {
671          continue;
672       }
673       fd = jcr->store_bsock;
674       if (fd) {
675          timer_start = fd->timer_start;
676          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
677             fd->timer_start = 0;      /* turn off timer */
678             fd->timed_out = true;
679             Jmsg(jcr, M_ERROR, 0, _(
680 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
681                  watchdog_time - timer_start);
682             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
683          }
684       }
685       fd = jcr->file_bsock;
686       if (fd) {
687          timer_start = fd->timer_start;
688          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
689             fd->timer_start = 0;      /* turn off timer */
690             fd->timed_out = true;
691             Jmsg(jcr, M_ERROR, 0, _(
692 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
693                  watchdog_time - timer_start);
694             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
695          }
696       }
697       fd = jcr->dir_bsock;
698       if (fd) {
699          timer_start = fd->timer_start;
700          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
701             fd->timer_start = 0;      /* turn off timer */
702             fd->timed_out = true;
703             Jmsg(jcr, M_ERROR, 0, _(
704 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
705                  watchdog_time - timer_start);
706             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
707          }
708       }
709    }
710    endeach_jcr(jcr);
711
712    Dmsg0(3400, "Finished JCR timeout checks\n");
713 }
714
715 /*
716  * Timeout signal comes here
717  */
718 extern "C" void timeout_handler(int sig)
719 {
720    return;                            /* thus interrupting the function */
721 }