]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
This commit was manufactured by cvs2svn to create tag
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2  * Manipulation routines for Job Control Records and
3  *  handling of last_jobs_list.
4  *
5  *  Kern E. Sibbald, December 2000
6  *
7  *  Version $Id$
8  *
9  *  These routines are thread safe.
10  *
11  *  The job list routines were re-written in May 2005 to
12  *  eliminate the global lock while traversing the list, and
13  *  to use the dlist subroutines.  The locking is now done
14  *  on the list each time the list is modified or traversed.
15  *  That is it is "micro-locked" rather than globally locked.
16  *  The result is that there is one lock/unlock for each entry
17  *  in the list while traversing it rather than a single lock
18  *  at the beginning of a traversal and one at the end.  This
19  *  incurs slightly more overhead, but effectively eliminates 
20  *  the possibilty of race conditions.  In addition, with the
21  *  exception of the global locking of the list during the
22  *  re-reading of the config file, no recursion is needed.
23  *
24  */
25 /*
26    Copyright (C) 2000-2006 Kern Sibbald
27
28    This program is free software; you can redistribute it and/or
29    modify it under the terms of the GNU General Public License
30    version 2 as amended with additional clauses defined in the
31    file LICENSE in the main source directory.
32
33    This program is distributed in the hope that it will be useful,
34    but WITHOUT ANY WARRANTY; without even the implied warranty of
35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
36    the file LICENSE for additional details.
37
38  */
39
40 #include "bacula.h"
41 #include "jcr.h"
42
43 /* External variables we reference */
44 extern time_t watchdog_time;
45
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
54 #else
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
57 #endif
58
59
60 int num_jobs_run;
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
63  
64 static dlist *jcrs = NULL;            /* JCR chain */
65 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
66
67 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
68
69 static pthread_mutex_t last_jobs_mutex = PTHREAD_MUTEX_INITIALIZER;
70
71
72 void lock_jobs()
73 {
74    P(job_start_mutex);
75 }
76
77 void unlock_jobs()
78 {
79    V(job_start_mutex);
80 }
81
82 void init_last_jobs_list()
83 {
84    JCR *jcr = NULL;
85    struct s_last_job *job_entry = NULL;
86    if (!last_jobs) {
87       last_jobs = New(dlist(job_entry, &job_entry->link));
88    }
89    if (!jcrs) {
90       jcrs = New(dlist(jcr, &jcr->link));
91    }
92 }
93
94 void term_last_jobs_list()
95 {
96    if (last_jobs) {
97       while (!last_jobs->empty()) {
98          void *je = last_jobs->first();
99          last_jobs->remove(je);
100          free(je);
101       }
102       delete last_jobs;
103       last_jobs = NULL;
104    }
105    if (jcrs) {
106       delete jcrs;
107       jcrs = NULL;
108    }
109 }
110
111 bool read_last_jobs_list(int fd, uint64_t addr)
112 {
113    struct s_last_job *je, job;
114    uint32_t num;
115
116    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
117    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
118       return false;
119    }
120    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
121       return false;
122    }
123    Dmsg1(100, "Read num_items=%d\n", num);
124    if (num > 4 * max_last_jobs) {  /* sanity check */
125       return false;
126    }
127    for ( ; num; num--) {
128       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
129          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
130          return false;
131       }
132       if (job.JobId > 0) {
133          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
134          memcpy((char *)je, (char *)&job, sizeof(job));
135          if (!last_jobs) {
136             init_last_jobs_list();
137          }
138          last_jobs->append(je);
139          if (last_jobs->size() > max_last_jobs) {
140             je = (struct s_last_job *)last_jobs->first();
141             last_jobs->remove(je);
142             free(je);
143          }
144       }
145    }
146    return true;
147 }
148
149 uint64_t write_last_jobs_list(int fd, uint64_t addr)
150 {
151    struct s_last_job *je;
152    uint32_t num;
153
154    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
155    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
156       return 0;
157    }
158    if (last_jobs) {
159       /* First record is number of entires */
160       num = last_jobs->size();
161       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
162          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
163          return 0;
164       }
165       foreach_dlist(je, last_jobs) {
166          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
167             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
168             return 0;
169          }
170       }
171    }
172    /* Return current address */
173    ssize_t stat = lseek(fd, 0, SEEK_CUR);
174    if (stat < 0) {
175       stat = 0;
176    }
177    return stat;
178
179 }
180
181 void lock_last_jobs_list()
182 {
183    P(last_jobs_mutex);
184 }
185
186 void unlock_last_jobs_list()
187 {
188    V(last_jobs_mutex);
189 }
190
191 /*
192  * Push a subroutine address into the job end callback stack
193  */
194 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
195 {
196    jcr->job_end_push.append((void *)job_end_cb);
197    jcr->job_end_push.append(ctx);
198 }
199
200 /* Pop each job_end subroutine and call it */
201 static void job_end_pop(JCR *jcr)
202 {
203    void (*job_end_cb)(JCR *jcr, void *ctx);
204    void *ctx;
205    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
206       ctx = jcr->job_end_push.get(i--);
207       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
208       job_end_cb(jcr, ctx);
209    }
210 }
211
212 /*
213  * Create a Job Control Record and link it into JCR chain
214  * Returns newly allocated JCR
215  * Note, since each daemon has a different JCR, he passes
216  *  us the size.
217  */
218 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
219 {
220    JCR *jcr;
221    MQUEUE_ITEM *item = NULL;
222    struct sigaction sigtimer;
223
224    Dmsg0(3400, "Enter new_jcr\n");
225    jcr = (JCR *)malloc(size);
226    memset(jcr, 0, size);
227    jcr->my_thread_id = pthread_self();
228    jcr->msg_queue = New(dlist(item, &item->link));
229    jcr->job_end_push.init(1, false);
230    jcr->sched_time = time(NULL);
231    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
232    jcr->init_mutex();
233    jcr->inc_use_count();   
234    jcr->JobStatus = JS_Created;       /* ready to run */
235    jcr->VolumeName = get_pool_memory(PM_FNAME);
236    jcr->VolumeName[0] = 0;
237    jcr->errmsg = get_pool_memory(PM_MESSAGE);
238    jcr->errmsg[0] = 0;
239    /* Setup some dummy values */
240    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
241    jcr->JobId = 0;
242    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
243    jcr->JobLevel = L_NONE;
244    jcr->JobStatus = JS_Created;
245
246    sigtimer.sa_flags = 0;
247    sigtimer.sa_handler = timeout_handler;
248    sigfillset(&sigtimer.sa_mask);
249    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
250
251    /*
252     * Locking jobs is a global lock that is needed
253     * so that the Director can stop new jobs from being
254     * added to the jcr chain while it processes a new
255     * conf file and does the job_end_push().
256     */
257    lock_jobs();
258    lock_jcr_chain();
259    if (!jcrs) {
260       jcrs = New(dlist(jcr, &jcr->link));
261    }
262    jcrs->append(jcr);
263    unlock_jcr_chain();
264    unlock_jobs();
265
266    return jcr;
267 }
268
269
270 /*
271  * Remove a JCR from the chain
272  * NOTE! The chain must be locked prior to calling
273  *       this routine.
274  */
275 static void remove_jcr(JCR *jcr)
276 {
277    Dmsg0(3400, "Enter remove_jcr\n");
278    if (!jcr) {
279       Emsg0(M_ABORT, 0, _("NULL jcr.\n"));
280    }
281    jcrs->remove(jcr);
282    Dmsg0(3400, "Leave remove_jcr\n");
283 }
284
285 /*
286  * Free stuff common to all JCRs.  N.B. Be careful to include only
287  *  generic stuff in the common part of the jcr.
288  */
289 static void free_common_jcr(JCR *jcr)
290 {
291    struct s_last_job *je, last_job;
292
293    /* Keep some statistics */
294    switch (jcr->JobType) {
295    case JT_BACKUP:
296    case JT_VERIFY:
297    case JT_RESTORE:
298    case JT_MIGRATE:
299    case JT_COPY:
300    case JT_ADMIN:
301       num_jobs_run++;
302       last_job.Errors = jcr->Errors;
303       last_job.JobType = jcr->JobType;
304       last_job.JobId = jcr->JobId;
305       last_job.VolSessionId = jcr->VolSessionId;
306       last_job.VolSessionTime = jcr->VolSessionTime;
307       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
308       last_job.JobFiles = jcr->JobFiles;
309       last_job.JobBytes = jcr->JobBytes;
310       last_job.JobStatus = jcr->JobStatus;
311       last_job.JobLevel = jcr->JobLevel;
312       last_job.start_time = jcr->start_time;
313       last_job.end_time = time(NULL);
314       /* Keep list of last jobs, but not Console where JobId==0 */
315       if (last_job.JobId > 0) {
316          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
317          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
318          if (!last_jobs) {
319             init_last_jobs_list();
320          }
321          last_jobs->append(je);
322          if (last_jobs->size() > max_last_jobs) {
323             je = (struct s_last_job *)last_jobs->first();
324             last_jobs->remove(je);
325             free(je);
326          }
327       }
328       break;
329    default:
330       break;
331    }
332    jcr->destroy_mutex();
333
334    delete jcr->msg_queue;
335    close_msg(jcr);                    /* close messages for this job */
336
337    /* do this after closing messages */
338    if (jcr->client_name) {
339       free_pool_memory(jcr->client_name);
340       jcr->client_name = NULL;
341    }
342
343    if (jcr->attr) {
344       free_pool_memory(jcr->attr);
345       jcr->attr = NULL;
346    }
347
348    if (jcr->sd_auth_key) {
349       free(jcr->sd_auth_key);
350       jcr->sd_auth_key = NULL;
351    }
352    if (jcr->VolumeName) {
353       free_pool_memory(jcr->VolumeName);
354       jcr->VolumeName = NULL;
355    }
356
357    if (jcr->dir_bsock) {
358       bnet_close(jcr->dir_bsock);
359       jcr->dir_bsock = NULL;
360    }
361    if (jcr->errmsg) {
362       free_pool_memory(jcr->errmsg);
363       jcr->errmsg = NULL;
364    }
365    if (jcr->where) {
366       free(jcr->where);
367       jcr->where = NULL;
368    }
369    if (jcr->cached_path) {
370       free_pool_memory(jcr->cached_path);
371       jcr->cached_path = NULL;
372       jcr->cached_pnl = 0;
373    }
374    free_getuser_cache();
375    free_getgroup_cache();
376    free(jcr);
377 }
378
379 /*
380  * Global routine to free a jcr
381  */
382 #ifdef DEBUG
383 void b_free_jcr(const char *file, int line, JCR *jcr)
384 {
385    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
386
387 #else
388
389 void free_jcr(JCR *jcr)
390 {
391
392    Dmsg2(3400, "Enter free_jcr 0x%x job=%d\n", jcr, jcr->JobId);
393
394 #endif
395
396    dequeue_messages(jcr);
397    lock_jcr_chain();
398    jcr->dec_use_count();              /* decrement use count */
399    if (jcr->use_count() < 0) {
400       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
401          jcr->use_count(), jcr->JobId);
402    }
403    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count(), jcr->JobId);
404    if (jcr->use_count() > 0) {          /* if in use */
405       unlock_jcr_chain();
406       Dmsg3(3400, "free_jcr 0x%x job=%d use_count=%d\n", jcr, jcr->JobId, jcr->use_count());
407       return;
408    }
409
410    remove_jcr(jcr);                   /* remove Jcr from chain */
411    unlock_jcr_chain();
412
413    job_end_pop(jcr);                  /* pop and call hooked routines */
414
415    Dmsg1(3400, "End job=%d\n", jcr->JobId);
416    if (jcr->daemon_free_jcr) {
417       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
418    }
419    free_common_jcr(jcr);
420    close_msg(NULL);                   /* flush any daemon messages */
421    garbage_collect_memory_pool();
422    Dmsg0(3400, "Exit free_jcr\n");
423 }
424
425
426 /*
427  * Given a JobId, find the JCR
428  *   Returns: jcr on success
429  *            NULL on failure
430  */
431 JCR *get_jcr_by_id(uint32_t JobId)
432 {
433    JCR *jcr;
434
435    foreach_jcr(jcr) {
436       if (jcr->JobId == JobId) {
437          jcr->inc_use_count();
438          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
439          break;
440       }
441    }
442    endeach_jcr(jcr);
443    return jcr;
444 }
445
446 /*
447  * Given a SessionId and SessionTime, find the JCR
448  *   Returns: jcr on success
449  *            NULL on failure
450  */
451 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
452 {
453    JCR *jcr;
454
455    foreach_jcr(jcr) {
456       if (jcr->VolSessionId == SessionId &&
457           jcr->VolSessionTime == SessionTime) {
458          jcr->inc_use_count();
459          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
460          break;
461       }
462    }
463    endeach_jcr(jcr);
464    return jcr;
465 }
466
467
468 /*
469  * Given a Job, find the JCR
470  *  compares on the number of characters in Job
471  *  thus allowing partial matches.
472  *   Returns: jcr on success
473  *            NULL on failure
474  */
475 JCR *get_jcr_by_partial_name(char *Job)
476 {
477    JCR *jcr;
478    int len;
479
480    if (!Job) {
481       return NULL;
482    }
483    len = strlen(Job);
484    foreach_jcr(jcr) {
485       if (strncmp(Job, jcr->Job, len) == 0) {
486          jcr->inc_use_count();
487          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
488          break;
489       }
490    }
491    endeach_jcr(jcr);
492    return jcr;
493 }
494
495
496 /*
497  * Given a Job, find the JCR
498  *  requires an exact match of names.
499  *   Returns: jcr on success
500  *            NULL on failure
501  */
502 JCR *get_jcr_by_full_name(char *Job)
503 {
504    JCR *jcr;
505
506    if (!Job) {
507       return NULL;
508    }
509    foreach_jcr(jcr) {
510       if (strcmp(jcr->Job, Job) == 0) {
511          jcr->inc_use_count();
512          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count());
513          break;
514       }
515    }
516    endeach_jcr(jcr);
517    return jcr;
518 }
519
520 void set_jcr_job_status(JCR *jcr, int JobStatus)
521 {
522    /*
523     * For a set of errors, ... keep the current status
524     *   so it isn't lost. For all others, set it.
525     */
526    switch (jcr->JobStatus) {
527    case JS_ErrorTerminated:
528    case JS_Error:
529    case JS_FatalError:
530    case JS_Differences:
531    case JS_Canceled:
532       break;
533    default:
534       jcr->JobStatus = JobStatus;
535    }
536 }
537
538 #ifdef TRACE_JCR_CHAIN
539 static int lock_count = 0;
540 #endif
541
542 /*
543  * Lock the chain
544  */
545 #ifdef TRACE_JCR_CHAIN
546 static void b_lock_jcr_chain(const char *fname, int line)
547 #else
548 static void lock_jcr_chain()
549 #endif
550 {
551 #ifdef TRACE_JCR_CHAIN
552    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
553       fname, line);
554 #endif
555    P(jcr_lock);
556 }
557
558 /*
559  * Unlock the chain
560  */
561 #ifdef TRACE_JCR_CHAIN
562 static void b_unlock_jcr_chain(const char *fname, int line)
563 #else
564 static void unlock_jcr_chain()
565 #endif
566 {
567 #ifdef TRACE_JCR_CHAIN
568    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
569       fname, line);
570 #endif
571    V(jcr_lock);
572 }
573
574
575 /*
576  * Start walk of jcr chain
577  * The proper way to walk the jcr chain is:
578  *    JCR *jcr;
579  *    foreach_jcr(jcr) {
580  *      ...
581  *    }
582  *    endeach_jcr(jcr);
583  *
584  *  It is possible to leave out the endeach_jcr(jcr), but
585  *   in that case, the last jcr referenced must be explicitly
586  *   released with:
587  *
588  *    free_jcr(jcr);
589  *  
590  */
591 JCR *jcr_walk_start() 
592 {
593    JCR *jcr;
594    lock_jcr_chain();
595    jcr = (JCR *)jcrs->first();
596    if (jcr) {
597       jcr->inc_use_count();
598       Dmsg3(3400, "Inc jcr_walk_start 0x%x job=%d use_count=%d\n", jcr, 
599             jcr->JobId, jcr->use_count());
600    }
601    unlock_jcr_chain();
602    return jcr;
603 }
604
605 /*
606  * Get next jcr from chain, and release current one
607  */
608 JCR *jcr_walk_next(JCR *prev_jcr)
609 {
610    JCR *jcr;
611
612    lock_jcr_chain();
613    jcr = (JCR *)jcrs->next(prev_jcr);
614    if (jcr) {
615       jcr->inc_use_count();
616       Dmsg3(3400, "Inc jcr_walk_next 0x%x job=%d use_count=%d\n", jcr, 
617          jcr->JobId, jcr->use_count());
618    }
619    unlock_jcr_chain();
620    if (prev_jcr) {
621       free_jcr(prev_jcr);
622    }
623    return jcr;
624 }
625
626 /*
627  * Release last jcr referenced
628  */
629 void jcr_walk_end(JCR *jcr)
630 {
631    if (jcr) {
632       free_jcr(jcr);
633    }
634 }
635
636
637 /*
638  * Setup to call the timeout check routine every 30 seconds
639  *  This routine will check any timers that have been enabled.
640  */
641 bool init_jcr_subsystem(void)
642 {
643    watchdog_t *wd = new_watchdog();
644
645    wd->one_shot = false;
646    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
647                          if only with a #define */
648    wd->callback = jcr_timeout_check;
649
650    register_watchdog(wd);
651
652    return true;
653 }
654
655 static void jcr_timeout_check(watchdog_t *self)
656 {
657    JCR *jcr;
658    BSOCK *fd;
659    time_t timer_start;
660
661    Dmsg0(3400, "Start JCR timeout checks\n");
662
663    /* Walk through all JCRs checking if any one is
664     * blocked for more than specified max time.
665     */
666    foreach_jcr(jcr) {
667       Dmsg2(3400, "jcr_timeout_check JobId=%u jcr=0x%x\n", jcr->JobId, jcr);
668       if (jcr->JobId == 0) {
669          continue;
670       }
671       fd = jcr->store_bsock;
672       if (fd) {
673          timer_start = fd->timer_start;
674          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
675             fd->timer_start = 0;      /* turn off timer */
676             fd->timed_out = true;
677             Jmsg(jcr, M_ERROR, 0, _(
678 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
679                  watchdog_time - timer_start);
680             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
681          }
682       }
683       fd = jcr->file_bsock;
684       if (fd) {
685          timer_start = fd->timer_start;
686          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
687             fd->timer_start = 0;      /* turn off timer */
688             fd->timed_out = true;
689             Jmsg(jcr, M_ERROR, 0, _(
690 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
691                  watchdog_time - timer_start);
692             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
693          }
694       }
695       fd = jcr->dir_bsock;
696       if (fd) {
697          timer_start = fd->timer_start;
698          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
699             fd->timer_start = 0;      /* turn off timer */
700             fd->timed_out = true;
701             Jmsg(jcr, M_ERROR, 0, _(
702 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
703                  watchdog_time - timer_start);
704             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
705          }
706       }
707    }
708    endeach_jcr(jcr);
709
710    Dmsg0(3400, "Finished JCR timeout checks\n");
711 }
712
713 /*
714  * Timeout signal comes here
715  */
716 extern "C" void timeout_handler(int sig)
717 {
718    return;                            /* thus interrupting the function */
719 }