]> git.sur5r.net Git - bacula/bacula/blob - bacula/src/lib/jcr.c
Use light weight non-recursive locking on jcr chain.
[bacula/bacula] / bacula / src / lib / jcr.c
1 /*
2  * Manipulation routines for Job Control Records and
3  *  handling of last_jobs_list.
4  *
5  *  Kern E. Sibbald, December 2000
6  *
7  *  Version $Id$
8  *
9  *  These routines are thread safe.
10  *
11  *  The job list routines were re-written in May 2005 to
12  *  eliminate the global lock while traversing the list, and
13  *  to use the dlist subroutines.  The locking is now done
14  *  on the list each time the list is modified or traversed.
15  *  That is it is "micro-locked" rather than globally locked.
16  *  The result is that there is one lock/unlock for each entry
17  *  in the list while traversing it rather than a single lock
18  *  at the beginning of a traversal and one at the end.  This
19  *  incurs slightly more overhead, but effectively eliminates 
20  *  the possibilty of race conditions.  In addition, with the
21  *  exception of the global locking of the list during the
22  *  re-reading of the config file, no recursion is needed.
23  *
24  */
25 /*
26    Copyright (C) 2000-2005 Kern Sibbald
27
28    This program is free software; you can redistribute it and/or
29    modify it under the terms of the GNU General Public License
30    version 2 as ammended with additional clauses defined in the
31    file LICENSE in the main source directory.
32
33    This program is distributed in the hope that it will be useful,
34    but WITHOUT ANY WARRANTY; without even the implied warranty of
35    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 
36    the file LICENSE for additional details.
37
38  */
39
40 #include "bacula.h"
41 #include "jcr.h"
42
43 /* External variables we reference */
44 extern time_t watchdog_time;
45
46 /* Forward referenced functions */
47 extern "C" void timeout_handler(int sig);
48 static void jcr_timeout_check(watchdog_t *self);
49 #ifdef TRACE_JCR_CHAIN
50 static void b_lock_jcr_chain(const char *filen, int line);
51 static void b_unlock_jcr_chain(const char *filen, int line);
52 #define lock_jcr_chain() b_lock_jcr_chain(__FILE__, __LINE__);
53 #define unlock_jcr_chain() b_unlock_jcr_chain(__FILE__, __LINE__);
54 #else
55 static void lock_jcr_chain();
56 static void unlock_jcr_chain();
57 #endif
58
59
60 int num_jobs_run;
61 dlist *last_jobs = NULL;
62 const int max_last_jobs = 10;
63  
64 static dlist *jcrs = NULL;            /* JCR chain */
65 //static brwlock_t lock;                /* lock for last jobs and JCR chain */
66 static pthread_mutex_t jcr_lock = PTHREAD_MUTEX_INITIALIZER;
67
68 static pthread_mutex_t job_start_mutex = PTHREAD_MUTEX_INITIALIZER;
69
70 void lock_jobs()
71 {
72    P(job_start_mutex);
73 }
74
75 void unlock_jobs()
76 {
77    V(job_start_mutex);
78 }
79
80 void init_last_jobs_list()
81 {
82 // int errstat;
83    JCR *jcr;
84    struct s_last_job *job_entry = NULL;
85    if (!last_jobs) {
86       last_jobs = New(dlist(job_entry, &job_entry->link));
87 //    if ((errstat=rwl_init(&lock)) != 0) {
88 //       Emsg1(M_ABORT, 0, _("Unable to initialize jcr_chain lock. ERR=%s\n"),
89 //             strerror(errstat));
90 //    }
91    }
92    if (!jcrs) {
93       jcrs = New(dlist(jcr, &jcr->link));
94    }
95 }
96
97 void term_last_jobs_list()
98 {
99    if (last_jobs) {
100       while (!last_jobs->empty()) {
101          void *je = last_jobs->first();
102          last_jobs->remove(je);
103          free(je);
104       }
105       delete last_jobs;
106       last_jobs = NULL;
107 //    rwl_destroy(&lock);
108       delete jcrs;
109    }
110 }
111
112 void read_last_jobs_list(int fd, uint64_t addr)
113 {
114    struct s_last_job *je, job;
115    uint32_t num;
116
117    Dmsg1(100, "read_last_jobs seek to %d\n", (int)addr);
118    if (addr == 0 || lseek(fd, (off_t)addr, SEEK_SET) < 0) {
119       return;
120    }
121    if (read(fd, &num, sizeof(num)) != sizeof(num)) {
122       return;
123    }
124    Dmsg1(100, "Read num_items=%d\n", num);
125    if (num > 4 * max_last_jobs) {  /* sanity check */
126       return;
127    }
128    for ( ; num; num--) {
129       if (read(fd, &job, sizeof(job)) != sizeof(job)) {
130          Dmsg1(000, "Read job entry. ERR=%s\n", strerror(errno));
131          return;
132       }
133       if (job.JobId > 0) {
134          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
135          memcpy((char *)je, (char *)&job, sizeof(job));
136          if (!last_jobs) {
137             init_last_jobs_list();
138          }
139          last_jobs->append(je);
140          if (last_jobs->size() > max_last_jobs) {
141             je = (struct s_last_job *)last_jobs->first();
142             last_jobs->remove(je);
143             free(je);
144          }
145       }
146    }
147 }
148
149 uint64_t write_last_jobs_list(int fd, uint64_t addr)
150 {
151    struct s_last_job *je;
152    uint32_t num;
153
154    Dmsg1(100, "write_last_jobs seek to %d\n", (int)addr);
155    if (lseek(fd, (off_t)addr, SEEK_SET) < 0) {
156       return 0;
157    }
158    if (last_jobs) {
159       /* First record is number of entires */
160       num = last_jobs->size();
161       if (write(fd, &num, sizeof(num)) != sizeof(num)) {
162          Dmsg1(000, "Error writing num_items: ERR=%s\n", strerror(errno));
163          return 0;
164       }
165       foreach_dlist(je, last_jobs) {
166          if (write(fd, je, sizeof(struct s_last_job)) != sizeof(struct s_last_job)) {
167             Dmsg1(000, "Error writing job: ERR=%s\n", strerror(errno));
168             return 0;
169          }
170       }
171    }
172    /* Return current address */
173    ssize_t stat = lseek(fd, 0, SEEK_CUR);
174    if (stat < 0) {
175       stat = 0;
176    }
177    return stat;
178
179 }
180
181 void lock_last_jobs_list()
182 {
183    /* Use jcr chain mutex */
184    lock_jcr_chain();
185 }
186
187 void unlock_last_jobs_list()
188 {
189    /* Use jcr chain mutex */
190    unlock_jcr_chain();
191 }
192
193 /*
194  * Push a subroutine address into the job end callback stack
195  */
196 void job_end_push(JCR *jcr, void job_end_cb(JCR *jcr,void *), void *ctx)
197 {
198    jcr->job_end_push.append((void *)job_end_cb);
199    jcr->job_end_push.append(ctx);
200 }
201
202 /* Pop each job_end subroutine and call it */
203 static void job_end_pop(JCR *jcr)
204 {
205    void (*job_end_cb)(JCR *jcr, void *ctx);
206    void *ctx;
207    for (int i=jcr->job_end_push.size()-1; i > 0; ) {
208       ctx = jcr->job_end_push.get(i--);
209       job_end_cb = (void (*)(JCR *,void *))jcr->job_end_push.get(i--);
210       job_end_cb(jcr, ctx);
211    }
212 }
213
214 /*
215  * Create a Job Control Record and link it into JCR chain
216  * Returns newly allocated JCR
217  * Note, since each daemon has a different JCR, he passes
218  *  us the size.
219  */
220 JCR *new_jcr(int size, JCR_free_HANDLER *daemon_free_jcr)
221 {
222    JCR *jcr;
223    MQUEUE_ITEM *item = NULL;
224    struct sigaction sigtimer;
225
226    Dmsg0(3400, "Enter new_jcr\n");
227    jcr = (JCR *)malloc(size);
228    memset(jcr, 0, size);
229    jcr->my_thread_id = pthread_self();
230    jcr->msg_queue = New(dlist(item, &item->link));
231    jcr->job_end_push.init(1, false);
232    jcr->sched_time = time(NULL);
233    jcr->daemon_free_jcr = daemon_free_jcr;    /* plug daemon free routine */
234    jcr->use_count = 1;
235    pthread_mutex_init(&(jcr->mutex), NULL);
236    jcr->JobStatus = JS_Created;       /* ready to run */
237    jcr->VolumeName = get_pool_memory(PM_FNAME);
238    jcr->VolumeName[0] = 0;
239    jcr->errmsg = get_pool_memory(PM_MESSAGE);
240    jcr->errmsg[0] = 0;
241    /* Setup some dummy values */
242    bstrncpy(jcr->Job, "*System*", sizeof(jcr->Job));
243    jcr->JobId = 0;
244    jcr->JobType = JT_SYSTEM;          /* internal job until defined */
245    jcr->JobLevel = L_NONE;
246    jcr->JobStatus = JS_Created;
247
248    sigtimer.sa_flags = 0;
249    sigtimer.sa_handler = timeout_handler;
250    sigfillset(&sigtimer.sa_mask);
251    sigaction(TIMEOUT_SIGNAL, &sigtimer, NULL);
252
253    /*
254     * Locking jobs is a global lock that is needed
255     * so that the Director can stop new jobs from being
256     * added to the jcr chain while it processes a new
257     * conf file and does the job_end_push().
258     */
259    lock_jobs();
260    lock_jcr_chain();
261    if (!jcrs) {
262       jcrs = New(dlist(jcr, &jcr->link));
263    }
264    jcrs->append(jcr);
265    unlock_jcr_chain();
266    unlock_jobs();
267
268    return jcr;
269 }
270
271
272 /*
273  * Remove a JCR from the chain
274  * NOTE! The chain must be locked prior to calling
275  *       this routine.
276  */
277 static void remove_jcr(JCR *jcr)
278 {
279    Dmsg0(3400, "Enter remove_jcr\n");
280    if (!jcr) {
281       Emsg0(M_ABORT, 0, "NULL jcr.\n");
282    }
283    jcrs->remove(jcr);
284    Dmsg0(3400, "Leave remove_jcr\n");
285 }
286
287 /*
288  * Free stuff common to all JCRs.  N.B. Be careful to include only
289  *  generic stuff in the common part of the jcr.
290  */
291 static void free_common_jcr(JCR *jcr)
292 {
293    struct s_last_job *je, last_job;
294
295    /* Keep some statistics */
296    switch (jcr->JobType) {
297    case JT_BACKUP:
298    case JT_VERIFY:
299    case JT_RESTORE:
300    case JT_ADMIN:
301       num_jobs_run++;
302       last_job.Errors = jcr->Errors;
303       last_job.JobType = jcr->JobType;
304       last_job.JobId = jcr->JobId;
305       last_job.VolSessionId = jcr->VolSessionId;
306       last_job.VolSessionTime = jcr->VolSessionTime;
307       bstrncpy(last_job.Job, jcr->Job, sizeof(last_job.Job));
308       last_job.JobFiles = jcr->JobFiles;
309       last_job.JobBytes = jcr->JobBytes;
310       last_job.JobStatus = jcr->JobStatus;
311       last_job.JobLevel = jcr->JobLevel;
312       last_job.start_time = jcr->start_time;
313       last_job.end_time = time(NULL);
314       /* Keep list of last jobs, but not Console where JobId==0 */
315       if (last_job.JobId > 0) {
316          je = (struct s_last_job *)malloc(sizeof(struct s_last_job));
317          memcpy((char *)je, (char *)&last_job, sizeof(last_job));
318          if (!last_jobs) {
319             init_last_jobs_list();
320          }
321          last_jobs->append(je);
322          if (last_jobs->size() > max_last_jobs) {
323             je = (struct s_last_job *)last_jobs->first();
324             last_jobs->remove(je);
325             free(je);
326          }
327       }
328       break;
329    default:
330       break;
331    }
332    pthread_mutex_destroy(&jcr->mutex);
333
334    delete jcr->msg_queue;
335    close_msg(jcr);                    /* close messages for this job */
336
337    /* do this after closing messages */
338    if (jcr->client_name) {
339       free_pool_memory(jcr->client_name);
340       jcr->client_name = NULL;
341    }
342
343    if (jcr->attr) {
344       free_pool_memory(jcr->attr);
345       jcr->attr = NULL;
346    }
347
348    if (jcr->sd_auth_key) {
349       free(jcr->sd_auth_key);
350       jcr->sd_auth_key = NULL;
351    }
352    if (jcr->VolumeName) {
353       free_pool_memory(jcr->VolumeName);
354       jcr->VolumeName = NULL;
355    }
356
357    if (jcr->dir_bsock) {
358       bnet_close(jcr->dir_bsock);
359       jcr->dir_bsock = NULL;
360    }
361    if (jcr->errmsg) {
362       free_pool_memory(jcr->errmsg);
363       jcr->errmsg = NULL;
364    }
365    if (jcr->where) {
366       free(jcr->where);
367       jcr->where = NULL;
368    }
369    if (jcr->cached_path) {
370       free_pool_memory(jcr->cached_path);
371       jcr->cached_path = NULL;
372       jcr->cached_pnl = 0;
373    }
374    free_getuser_cache();
375    free_getgroup_cache();
376    free(jcr);
377 }
378
379 /*
380  * Global routine to free a jcr
381  */
382 #ifdef DEBUG
383 void b_free_jcr(const char *file, int line, JCR *jcr)
384 {
385    Dmsg3(3400, "Enter free_jcr 0x%x from %s:%d\n", jcr, file, line);
386
387 #else
388
389 void free_jcr(JCR *jcr)
390 {
391
392    Dmsg1(3400, "Enter free_jcr 0x%x\n", jcr);
393
394 #endif
395
396    dequeue_messages(jcr);
397    lock_jcr_chain();
398    jcr->dec_use_count();              /* decrement use count */
399    if (jcr->use_count < 0) {
400       Emsg2(M_ERROR, 0, _("JCR use_count=%d JobId=%d\n"),
401          jcr->use_count, jcr->JobId);
402    }
403    Dmsg3(3400, "Dec free_jcr 0x%x use_count=%d jobid=%d\n", jcr, jcr->use_count, jcr->JobId);
404    if (jcr->use_count > 0) {          /* if in use */
405       unlock_jcr_chain();
406       Dmsg2(3400, "free_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
407       return;
408    }
409
410    remove_jcr(jcr);                   /* remove Jcr from chain */
411    unlock_jcr_chain();
412
413    job_end_pop(jcr);                  /* pop and call hooked routines */
414
415    Dmsg1(3400, "End job=%d\n", jcr->JobId);
416    if (jcr->daemon_free_jcr) {
417       jcr->daemon_free_jcr(jcr);      /* call daemon free routine */
418    }
419    free_common_jcr(jcr);
420    close_msg(NULL);                   /* flush any daemon messages */
421    Dmsg0(3400, "Exit free_jcr\n");
422 }
423
424
425 /*
426  * Given a JobId, find the JCR
427  *   Returns: jcr on success
428  *            NULL on failure
429  */
430 JCR *get_jcr_by_id(uint32_t JobId)
431 {
432    JCR *jcr;
433
434    lock_jcr_chain();                    /* lock chain */
435    foreach_dlist(jcr, jcrs) {
436       if (jcr->JobId == JobId) {
437          jcr->inc_use_count();
438          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
439          break;
440       }
441    }
442    unlock_jcr_chain();
443    return jcr;
444 }
445
446 /*
447  * Given a SessionId and SessionTime, find the JCR
448  *   Returns: jcr on success
449  *            NULL on failure
450  */
451 JCR *get_jcr_by_session(uint32_t SessionId, uint32_t SessionTime)
452 {
453    JCR *jcr;
454
455    lock_jcr_chain();
456    foreach_dlist(jcr, jcrs) {
457       if (jcr->VolSessionId == SessionId &&
458           jcr->VolSessionTime == SessionTime) {
459          jcr->inc_use_count();
460          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
461          break;
462       }
463    }
464    unlock_jcr_chain();
465    return jcr;
466 }
467
468
469 /*
470  * Given a Job, find the JCR
471  *  compares on the number of characters in Job
472  *  thus allowing partial matches.
473  *   Returns: jcr on success
474  *            NULL on failure
475  */
476 JCR *get_jcr_by_partial_name(char *Job)
477 {
478    JCR *jcr;
479    int len;
480
481    if (!Job) {
482       return NULL;
483    }
484    lock_jcr_chain();
485    len = strlen(Job);
486    foreach_dlist(jcr, jcrs) {
487       if (strncmp(Job, jcr->Job, len) == 0) {
488          jcr->inc_use_count();
489          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
490          break;
491       }
492    }
493    unlock_jcr_chain();
494    return jcr;
495 }
496
497
498 /*
499  * Given a Job, find the JCR
500  *  requires an exact match of names.
501  *   Returns: jcr on success
502  *            NULL on failure
503  */
504 JCR *get_jcr_by_full_name(char *Job)
505 {
506    JCR *jcr;
507
508    if (!Job) {
509       return NULL;
510    }
511    lock_jcr_chain();
512    foreach_dlist(jcr, jcrs) {
513       if (strcmp(jcr->Job, Job) == 0) {
514          jcr->inc_use_count();
515          Dmsg2(3400, "Inc get_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
516          break;
517       }
518    }
519    unlock_jcr_chain();
520    return jcr;
521 }
522
523 void set_jcr_job_status(JCR *jcr, int JobStatus)
524 {
525    /*
526     * For a set of errors, ... keep the current status
527     *   so it isn't lost. For all others, set it.
528     */
529    switch (jcr->JobStatus) {
530    case JS_ErrorTerminated:
531    case JS_Error:
532    case JS_FatalError:
533    case JS_Differences:
534    case JS_Canceled:
535       break;
536    default:
537       jcr->JobStatus = JobStatus;
538    }
539 }
540
541 #ifdef TRACE_JCR_CHAIN
542 static int lock_count = 0;
543 #endif
544
545 /*
546  * Lock the chain
547  */
548 #ifdef TRACE_JCR_CHAIN
549 static void b_lock_jcr_chain(const char *fname, int line)
550 #else
551 static void lock_jcr_chain()
552 #endif
553 {
554 // int errstat;
555 #ifdef TRACE_JCR_CHAIN
556    Dmsg3(3400, "Lock jcr chain %d from %s:%d\n", ++lock_count,
557       fname, line);
558 #endif
559 // if ((errstat=rwl_writelock(&lock)) != 0) {
560 //    Emsg1(M_ABORT, 0, "rwl_writelock failure. ERR=%s\n",
561 //         strerror(errstat));
562 // }
563    P(jcr_lock);
564 }
565
566 /*
567  * Unlock the chain
568  */
569 #ifdef TRACE_JCR_CHAIN
570 static void b_unlock_jcr_chain(const char *fname, int line)
571 #else
572 static void unlock_jcr_chain()
573 #endif
574 {
575 // int errstat;
576 #ifdef TRACE_JCR_CHAIN
577    Dmsg3(3400, "Unlock jcr chain %d from %s:%d\n", lock_count--,
578       fname, line);
579 #endif
580 // if ((errstat=rwl_writeunlock(&lock)) != 0) {
581 //    Emsg1(M_ABORT, 0, "rwl_writeunlock failure. ERR=%s\n",
582 //         strerror(errstat));
583 // }
584    V(jcr_lock);
585 }
586
587
588 JCR *get_next_jcr(JCR *prev_jcr)
589 {
590    JCR *jcr;
591
592    lock_jcr_chain();
593    jcr = (JCR *)jcrs->next(prev_jcr);
594    if (jcr) {
595       jcr->inc_use_count();
596       Dmsg2(3400, "Inc get_next_jcr 0x%x use_count=%d\n", jcr, jcr->use_count);
597    }
598    unlock_jcr_chain();
599    return jcr;
600 }
601
602 bool init_jcr_subsystem(void)
603 {
604    watchdog_t *wd = new_watchdog();
605
606    wd->one_shot = false;
607    wd->interval = 30;   /* FIXME: should be configurable somewhere, even
608                          if only with a #define */
609    wd->callback = jcr_timeout_check;
610
611    register_watchdog(wd);
612
613    return true;
614 }
615
616 static void jcr_timeout_check(watchdog_t *self)
617 {
618    JCR *jcr;
619    BSOCK *fd;
620    time_t timer_start;
621
622    Dmsg0(3400, "Start JCR timeout checks\n");
623
624    /* Walk through all JCRs checking if any one is
625     * blocked for more than specified max time.
626     */
627    foreach_jcr(jcr) {
628       if (jcr->JobId == 0) {
629          free_jcr(jcr);
630          continue;
631       }
632       fd = jcr->store_bsock;
633       if (fd) {
634          timer_start = fd->timer_start;
635          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
636             fd->timer_start = 0;      /* turn off timer */
637             fd->timed_out = TRUE;
638             Jmsg(jcr, M_ERROR, 0, _(
639 "Watchdog sending kill after %d secs to thread stalled reading Storage daemon.\n"),
640                  watchdog_time - timer_start);
641             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
642          }
643       }
644       fd = jcr->file_bsock;
645       if (fd) {
646          timer_start = fd->timer_start;
647          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
648             fd->timer_start = 0;      /* turn off timer */
649             fd->timed_out = TRUE;
650             Jmsg(jcr, M_ERROR, 0, _(
651 "Watchdog sending kill after %d secs to thread stalled reading File daemon.\n"),
652                  watchdog_time - timer_start);
653             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
654          }
655       }
656       fd = jcr->dir_bsock;
657       if (fd) {
658          timer_start = fd->timer_start;
659          if (timer_start && (watchdog_time - timer_start) > fd->timeout) {
660             fd->timer_start = 0;      /* turn off timer */
661             fd->timed_out = TRUE;
662             Jmsg(jcr, M_ERROR, 0, _(
663 "Watchdog sending kill after %d secs to thread stalled reading Director.\n"),
664                  watchdog_time - timer_start);
665             pthread_kill(jcr->my_thread_id, TIMEOUT_SIGNAL);
666          }
667       }
668       free_jcr(jcr);
669    }
670
671    Dmsg0(3400, "Finished JCR timeout checks\n");
672 }
673
674 /*
675  * Timeout signal comes here
676  */
677 extern "C" void timeout_handler(int sig)
678 {
679    return;                            /* thus interrupting the function */
680 }