]> git.sur5r.net Git - openldap/blob - servers/slurpd/rq.c
Merge termination bugfix from main. Provided by <vasquez@w270.de>.
[openldap] / servers / slurpd / rq.c
1 /*
2  * Copyright (c) 1996 Regents of the University of Michigan.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms are permitted
6  * provided that this notice is preserved and that due credit is given
7  * to the University of Michigan at Ann Arbor. The name of the University
8  * may not be used to endorse or promote products derived from this
9  * software without specific prior written permission. This software
10  * is provided ``as is'' without express or implied warranty.
11  */
12
13 /*
14  * rq.c - routines used to manage the queue of replication entries.
15  * An Rq (Replication queue) struct contains a linked list of Re
16  * (Replication entry) structures.
17  *
18  * Routines wishing to access the replication queue should do so through
19  * the Rq struct's member functions, e.g. rq->rq_gethead() and friends.
20  * For example, Re structs should be added to the queue by calling 
21  * the rq_add() member function.
22  *
23  * Access to the queue is serialized by a mutex.  Member functions which do
24  * not do their own locking should only be called after locking the queue
25  * using the rq_lock() member function.  The queue should be unlocked with
26  * the rq_unlock() member function.
27  *
28  * Note that some member functions handle their own locking internally.
29  * Callers should not lock the queue before calling these functions.
30  * See the comment block for each function below.
31  *
32  */
33
34 #include <stdio.h>
35
36 #include "slurp.h"
37 #include "globals.h"
38
39
40 /* externs */
41 #ifdef NEEDPROTOS
42 extern void Re_dump( Re *re );
43 #else /* NEEDPROTOS */
44 extern void Re_dump();
45 #endif /* NEEDPROTOS */
46
47
48 extern char *sys_errlist[];
49
50
51 /*
52  * Lock the replication queue.
53  */
54 static int
55 Rq_lock(
56     Rq  *rq
57 )
58 {
59     return( pthread_mutex_lock( &rq->rq_mutex ));
60 }
61
62
63
64
65
66 /*
67  * Unlock the replication queue.
68  */
69 static int
70 Rq_unlock(
71     Rq  *rq
72 )
73 {
74     return( pthread_mutex_unlock( &rq->rq_mutex ));
75 }
76
77
78
79 /*
80  * Return the head of the queue.  Callers should lock the queue before
81  * calling this routine.
82  */
83 static Re *
84 Rq_gethead(
85     Rq  *rq
86 )
87 {
88     return( rq == NULL ? NULL : rq->rq_head );
89 }
90
91
92
93
94 /*
95  * Return the next item in the queue.  Callers should lock the queue before
96  * calling this routine.
97  */
98 static Re *
99 Rq_getnext(
100     Re  *re
101 )
102 {
103     if ( re == NULL ) {
104         return NULL;
105     } else {
106         return( re->re_getnext( re ));
107     }
108 }
109
110
111
112
113 /*
114  * Delete the item at the head of the list.  The queue should be locked
115  * by the caller before calling this routine.
116  */
117 static int
118 Rq_delhead(
119     Rq  *rq
120 )
121 {
122     Re  *savedhead;
123     int rc;
124
125     if ( rq == NULL ) {
126         return( -1 );
127     }
128
129     savedhead = rq->rq_head;
130     if ( savedhead == NULL ) {
131         return( 0 );
132     }
133
134     if ( savedhead->re_getrefcnt( savedhead ) != 0 ) {
135         Debug( LDAP_DEBUG_ANY, "Warning: attempt to delete when refcnt != 0\n",
136                 0, 0, 0 );
137         return( -1 );
138     }
139
140     rq->rq_head = rq->rq_head->re_getnext( rq->rq_head );
141     rc = savedhead->re_free( savedhead );
142     rq->rq_nre--;       /* decrement count of Re's in queue */
143     return( rc );
144 }
145
146
147
148
149 /* 
150  * Add an entry to the tail of the replication queue.  Locking is handled
151  * internally.  When items are added to the queue, this routine wakes
152  * up any threads which are waiting for more work by signaling on the
153  * rq->rq_more condition variable.
154  */
155 static int
156 Rq_add(
157     Rq          *rq,
158     char        *buf
159 )
160 {
161     Re  *re;
162     int wasempty = 0;
163
164     /* Lock the queue */
165     rq->rq_lock( rq );
166
167     /* Create a new Re */
168     if ( Re_init( &re ) < 0 ) {
169         rq->rq_unlock( rq );
170         return -1;
171     }
172
173     /* parse buf and fill in the re struct */
174     if ( re->re_parse( re, buf ) < 0 ) {
175         re->re_free( re );
176         rq->rq_unlock( rq );
177         return -1;
178     }
179
180     /* Insert into queue */
181     if ( rq->rq_head == NULL ) {
182         rq->rq_head = re;
183         rq->rq_tail = re;
184         wasempty = 1;
185     } else {
186         rq->rq_tail->re_next = re;
187     }
188
189     /* set the sequence number */
190     re->re_seq = 0;
191     if ( !wasempty && !strcmp(rq->rq_tail->re_timestamp, re->re_timestamp )) {
192         /*
193          * Our new re has the same timestamp as the tail's timestamp.
194          * Increment the seq number in the tail and use it as our seq number.
195          */
196         re->re_seq = rq->rq_tail->re_seq + 1;
197     }
198     rq->rq_tail = re;
199
200     /* Increment count of items in queue */
201     rq->rq_nre++;
202     /* wake up any threads waiting for more work */
203     pthread_cond_broadcast( &rq->rq_more );
204
205     /* ... and unlock the queue */
206     rq->rq_unlock( rq );
207
208     return 0;
209 }
210
211
212
213
214 /*
215  * Garbage-collect the replication queue.  Locking is handled internally.
216  */
217 static void
218 Rq_gc(
219     Rq  *rq
220 )
221 {
222     if ( rq == NULL ) {
223         Debug( LDAP_DEBUG_ANY, "Rq_gc: rq is NULL!\n", 0, 0, 0 );
224         return;
225     }
226     rq->rq_lock( rq ); 
227     while (( rq->rq_head != NULL ) &&
228             ( rq->rq_head->re_getrefcnt( rq->rq_head ) == 0 )) {
229         rq->rq_delhead( rq );
230         rq->rq_ndel++;  /* increment count of deleted entries */
231     }
232     rq->rq_unlock( rq ); 
233     return;
234 }
235
236
237
238 /*
239  * For debugging: dump the contents of the replication queue to a file.
240  * Locking is handled internally.
241  */
242 static void
243 Rq_dump(
244     Rq  *rq
245 )
246 {
247     Re          *re;
248     FILE        *fp;
249
250     if ( rq == NULL ) {
251         Debug( LDAP_DEBUG_ANY, "Rq_dump: rq is NULL!\n", 0, 0, 0 );
252         return;
253     }
254
255     if (( fp = fopen( SLURPD_DUMPFILE, "w" )) == NULL ) {
256         Debug( LDAP_DEBUG_ANY, "Rq_dump: cannot open \"%s\" for write\n",
257                 SLURPD_DUMPFILE, 0, 0 );
258         return;
259     }
260
261     rq->rq_lock( rq );
262     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
263         re->re_dump( re, fp );
264     }
265     rq->rq_unlock( rq );
266     fclose( fp );
267     return;
268 }
269
270
271
272 /*
273  * Write the contents of a replication queue to a file.  Returns zero if
274  * successful, -1 if not.  Handles queue locking internally.  Callers should
275  * provide an open file pointer, which should refer to a locked file.
276  */
277 static int
278 Rq_write(
279     Rq          *rq,
280     FILE        *fp
281 )
282 {
283     Re          *re;
284     time_t      now;
285
286     if ( rq == NULL ) {
287         return -1;
288     }
289
290     Debug( LDAP_DEBUG_ARGS, "re-write on-disk replication log\n",
291             0, 0, 0 );
292 #ifndef SEEK_SET
293 #define SEEK_SET 0
294 #endif
295     fseek( fp, 0L, SEEK_SET );  /* Go to beginning of file */
296     rq->rq_lock( rq );
297
298     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
299         if ( re->re_write( NULL, re, fp ) < 0 ) {
300             fflush( fp );
301             rq->rq_unlock( rq );
302             return -1;
303         }
304     }
305     fflush( fp );
306     sglob->srpos = ftell( fp ); /* update replog file position */
307     /* and truncate to correct len */
308     if ( ftruncate( fileno( fp ), sglob->srpos ) < 0 ) {
309         Debug( LDAP_DEBUG_ANY, "Error truncating replication log: %s\n",
310                 sys_errlist[ errno ], 0, 0 );
311     }
312     rq->rq_ndel = 0;    /* reset count of deleted re's */
313     time( &now );
314     rq->rq_lasttrim = now;      /* reset last trim time */
315     rq->rq_unlock( rq );
316     return 0;
317 }
318
319
320
321
322 /*
323  * Check to see if the private slurpd replication log needs trimming.
324  * The current criteria are:
325  *  - The last trim was more than 5 minutes ago, *and*
326  *  - We've finished with at least 50 replication log entries since the
327  *    last time we re-wrote the replication log.
328  *
329  * Return 1 if replogfile should be trimmed, 0 if not.
330  * Any different policy should be implemented by replacing this function.
331  */
332 static int
333 Rq_needtrim(
334     Rq  *rq
335 )
336 {
337     int         rc = 0;
338     Re          *re;
339     int         nzrc = 0;       /* nzrc is count of entries with refcnt == 0 */
340     time_t      now;
341
342     if ( rq == NULL ) {
343         return 0;
344     }
345
346     rq->rq_lock( rq );
347
348     time( &now );
349
350     if ( now > ( rq->rq_lasttrim + TRIMCHECK_INTERVAL )) {
351         rc = ( rq->rq_ndel >= 50 );
352     } else {
353         rc = 0;
354     }
355     rq->rq_unlock( rq );
356     return rc;
357 }
358
359
360 /*
361  * Return counts of Re structs in the queue.
362  */
363 static int
364 Rq_getcount(
365     Rq  *rq,
366     int type
367 )
368 {
369     int count = 0;
370     Re  *re;
371
372     if ( rq == NULL ) {
373         return 0;
374     }
375
376     rq->rq_lock( rq );
377     if ( type == RQ_COUNT_ALL ) {
378         count = rq->rq_nre;
379     } else {
380         for ( re = rq->rq_gethead( rq ); re != NULL;
381                 re = rq->rq_getnext( re )) {
382             if ( type == RQ_COUNT_NZRC ) {
383                 if ( re->re_getrefcnt( re ) > 1 ) {
384                     count++;
385                 }
386             }
387         }
388     }
389     rq->rq_unlock( rq );
390     return count;
391 }
392
393
394
395
396 /* 
397  * Allocate and initialize an Rq object.
398  */
399 int
400 Rq_init(
401     Rq  **rq
402 )
403 {
404     /* Instantiate the struct */
405     (*rq) = (Rq *) malloc( sizeof( Rq ));
406     if ( *rq == NULL ) {
407         return -1;
408     }
409
410     /* Fill in all the function pointers */
411     (*rq)->rq_gethead = Rq_gethead;
412     (*rq)->rq_getnext = Rq_getnext;
413     (*rq)->rq_delhead = Rq_delhead;
414     (*rq)->rq_add = Rq_add;
415     (*rq)->rq_gc = Rq_gc;
416     (*rq)->rq_lock = Rq_lock;
417     (*rq)->rq_unlock = Rq_unlock;
418     (*rq)->rq_dump = Rq_dump;
419     (*rq)->rq_needtrim = Rq_needtrim;
420     (*rq)->rq_write = Rq_write;
421     (*rq)->rq_getcount = Rq_getcount;
422
423     /* Initialize private data */
424     pthread_mutex_init( &((*rq)->rq_mutex), pthread_mutexattr_default );
425     pthread_cond_init( &((*rq)->rq_more), pthread_condattr_default );
426     (*rq)->rq_head = NULL;
427     (*rq)->rq_tail = NULL;
428     (*rq)->rq_nre = 0;
429     (*rq)->rq_ndel = 0;
430     (*rq)->rq_lasttrim = (time_t) 0L;
431
432     return 0;
433 }
434