]> git.sur5r.net Git - openldap/blob - servers/slurpd/rq.c
783b15ba2d7536a2a58635cb72d44667b96210c0
[openldap] / servers / slurpd / rq.c
1 /*
2  * Copyright (c) 1996 Regents of the University of Michigan.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms are permitted
6  * provided that this notice is preserved and that due credit is given
7  * to the University of Michigan at Ann Arbor. The name of the University
8  * may not be used to endorse or promote products derived from this
9  * software without specific prior written permission. This software
10  * is provided ``as is'' without express or implied warranty.
11  */
12
13 /*
14  * rq.c - routines used to manage the queue of replication entries.
15  * An Rq (Replication queue) struct contains a linked list of Re
16  * (Replication entry) structures.
17  *
18  * Routines wishing to access the replication queue should do so through
19  * the Rq struct's member functions, e.g. rq->rq_gethead() and friends.
20  * For example, Re structs should be added to the queue by calling 
21  * the rq_add() member function.
22  *
23  * Access to the queue is serialized by a mutex.  Member functions which do
24  * not do their own locking should only be called after locking the queue
25  * using the rq_lock() member function.  The queue should be unlocked with
26  * the rq_unlock() member function.
27  *
28  * Note that some member functions handle their own locking internally.
29  * Callers should not lock the queue before calling these functions.
30  * See the comment block for each function below.
31  *
32  */
33
34 #include "portable.h"
35
36 #include <stdio.h>
37 #include <stdlib.h>
38
39 #include "slurp.h"
40 #include "globals.h"
41
42
43 /* externs */
44 extern void Re_dump LDAP_P(( Re *re ));
45
46 /*
47  * Lock the replication queue.
48  */
49 static int
50 Rq_lock(
51     Rq  *rq
52 )
53 {
54     return( pthread_mutex_lock( &rq->rq_mutex ));
55 }
56
57
58 /*
59  * Unlock the replication queue.
60  */
61 static int
62 Rq_unlock(
63     Rq  *rq
64 )
65 {
66     return( pthread_mutex_unlock( &rq->rq_mutex ));
67 }
68
69
70
71 /*
72  * Return the head of the queue.  Callers should lock the queue before
73  * calling this routine.
74  */
75 static Re *
76 Rq_gethead(
77     Rq  *rq
78 )
79 {
80     return( rq == NULL ? NULL : rq->rq_head );
81 }
82
83
84 /*
85  * Return the next item in the queue.  Callers should lock the queue before
86  * calling this routine.
87  */
88 static Re *
89 Rq_getnext(
90     Re  *re
91 )
92 {
93     if ( re == NULL ) {
94         return NULL;
95     } else {
96         return( re->re_getnext( re ));
97     }
98 }
99
100
101 /*
102  * Delete the item at the head of the list.  The queue should be locked
103  * by the caller before calling this routine.
104  */
105 static int
106 Rq_delhead(
107     Rq  *rq
108 )
109 {
110     Re  *savedhead;
111     int rc;
112
113     if ( rq == NULL ) {
114         return( -1 );
115     }
116
117     savedhead = rq->rq_head;
118     if ( savedhead == NULL ) {
119         return( 0 );
120     }
121
122     if ( savedhead->re_getrefcnt( savedhead ) != 0 ) {
123         Debug( LDAP_DEBUG_ANY, "Warning: attempt to delete when refcnt != 0\n",
124                 0, 0, 0 );
125         return( -1 );
126     }
127
128     rq->rq_head = rq->rq_head->re_getnext( rq->rq_head );
129     rc = savedhead->re_free( savedhead );
130     rq->rq_nre--;       /* decrement count of Re's in queue */
131     return( rc );
132 }
133
134
135 /* 
136  * Add an entry to the tail of the replication queue.  Locking is handled
137  * internally.  When items are added to the queue, this routine wakes
138  * up any threads which are waiting for more work by signaling on the
139  * rq->rq_more condition variable.
140  */
141 static int
142 Rq_add(
143     Rq          *rq,
144     char        *buf
145 )
146 {
147     Re  *re;
148     int wasempty = 0;
149
150     /* Lock the queue */
151     rq->rq_lock( rq );
152
153     /* Create a new Re */
154     if ( Re_init( &re ) < 0 ) {
155         rq->rq_unlock( rq );
156         return -1;
157     }
158
159     /* parse buf and fill in the re struct */
160     if ( re->re_parse( re, buf ) < 0 ) {
161         re->re_free( re );
162         rq->rq_unlock( rq );
163         return -1;
164     }
165
166     /* Insert into queue */
167     if ( rq->rq_head == NULL ) {
168         rq->rq_head = re;
169         rq->rq_tail = re;
170         wasempty = 1;
171     } else {
172         rq->rq_tail->re_next = re;
173     }
174
175     /* set the sequence number */
176     re->re_seq = 0;
177     if ( !wasempty && !strcmp(rq->rq_tail->re_timestamp, re->re_timestamp )) {
178         /*
179          * Our new re has the same timestamp as the tail's timestamp.
180          * Increment the seq number in the tail and use it as our seq number.
181          */
182         re->re_seq = rq->rq_tail->re_seq + 1;
183     }
184     rq->rq_tail = re;
185
186     /* Increment count of items in queue */
187     rq->rq_nre++;
188     /* wake up any threads waiting for more work */
189     pthread_cond_broadcast( &rq->rq_more );
190
191     /* ... and unlock the queue */
192     rq->rq_unlock( rq );
193
194     return 0;
195 }
196
197
198 /*
199  * Garbage-collect the replication queue.  Locking is handled internally.
200  */
201 static void
202 Rq_gc(
203     Rq  *rq
204 )
205 {
206     if ( rq == NULL ) {
207         Debug( LDAP_DEBUG_ANY, "Rq_gc: rq is NULL!\n", 0, 0, 0 );
208         return;
209     }
210     rq->rq_lock( rq ); 
211     while (( rq->rq_head != NULL ) &&
212             ( rq->rq_head->re_getrefcnt( rq->rq_head ) == 0 )) {
213         rq->rq_delhead( rq );
214         rq->rq_ndel++;  /* increment count of deleted entries */
215     }
216     rq->rq_unlock( rq ); 
217     return;
218 }
219
220
221 /*
222  * For debugging: dump the contents of the replication queue to a file.
223  * Locking is handled internally.
224  */
225 static void
226 Rq_dump(
227     Rq  *rq
228 )
229 {
230     Re          *re;
231     FILE        *fp;
232
233     if ( rq == NULL ) {
234         Debug( LDAP_DEBUG_ANY, "Rq_dump: rq is NULL!\n", 0, 0, 0 );
235         return;
236     }
237
238     if (( fp = fopen( SLURPD_DUMPFILE, "w" )) == NULL ) {
239         Debug( LDAP_DEBUG_ANY, "Rq_dump: cannot open \"%s\" for write\n",
240                 SLURPD_DUMPFILE, 0, 0 );
241         return;
242     }
243
244     rq->rq_lock( rq );
245     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
246         re->re_dump( re, fp );
247     }
248     rq->rq_unlock( rq );
249     fclose( fp );
250     return;
251 }
252
253
254 /*
255  * Write the contents of a replication queue to a file.  Returns zero if
256  * successful, -1 if not.  Handles queue locking internally.  Callers should
257  * provide an open file pointer, which should refer to a locked file.
258  */
259 static int
260 Rq_write(
261     Rq          *rq,
262     FILE        *fp
263 )
264 {
265     Re          *re;
266     time_t      now;
267
268     if ( rq == NULL ) {
269         return -1;
270     }
271
272     Debug( LDAP_DEBUG_ARGS, "re-write on-disk replication log\n",
273             0, 0, 0 );
274 #ifndef SEEK_SET
275 #define SEEK_SET 0
276 #endif
277     fseek( fp, 0L, SEEK_SET );  /* Go to beginning of file */
278     rq->rq_lock( rq );
279
280     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
281         if ( re->re_write( NULL, re, fp ) < 0 ) {
282             fflush( fp );
283             rq->rq_unlock( rq );
284             return -1;
285         }
286     }
287     fflush( fp );
288     sglob->srpos = ftell( fp ); /* update replog file position */
289     /* and truncate to correct len */
290     if ( ftruncate( fileno( fp ), sglob->srpos ) < 0 ) {
291         Debug( LDAP_DEBUG_ANY, "Error truncating replication log: %s\n",
292                 sys_errlist[ errno ], 0, 0 );
293     }
294     rq->rq_ndel = 0;    /* reset count of deleted re's */
295     time( &now );
296     rq->rq_lasttrim = now;      /* reset last trim time */
297     rq->rq_unlock( rq );
298     return 0;
299 }
300
301
302 /*
303  * Check to see if the private slurpd replication log needs trimming.
304  * The current criteria are:
305  *  - The last trim was more than 5 minutes ago, *and*
306  *  - We've finished with at least 50 replication log entries since the
307  *    last time we re-wrote the replication log.
308  *
309  * Return 1 if replogfile should be trimmed, 0 if not.
310  * Any different policy should be implemented by replacing this function.
311  */
312 static int
313 Rq_needtrim(
314     Rq  *rq
315 )
316 {
317     int         rc = 0;
318     Re          *re;
319     int         nzrc = 0;       /* nzrc is count of entries with refcnt == 0 */
320     time_t      now;
321
322     if ( rq == NULL ) {
323         return 0;
324     }
325
326     rq->rq_lock( rq );
327
328     time( &now );
329
330     if ( now > ( rq->rq_lasttrim + TRIMCHECK_INTERVAL )) {
331         rc = ( rq->rq_ndel >= 50 );
332     } else {
333         rc = 0;
334     }
335     rq->rq_unlock( rq );
336     return rc;
337 }
338
339
340 /*
341  * Return counts of Re structs in the queue.
342  */
343 static int
344 Rq_getcount(
345     Rq  *rq,
346     int type
347 )
348 {
349     int count = 0;
350     Re  *re;
351
352     if ( rq == NULL ) {
353         return 0;
354     }
355
356     rq->rq_lock( rq );
357     if ( type == RQ_COUNT_ALL ) {
358         count = rq->rq_nre;
359     } else {
360         for ( re = rq->rq_gethead( rq ); re != NULL;
361                 re = rq->rq_getnext( re )) {
362             if ( type == RQ_COUNT_NZRC ) {
363                 if ( re->re_getrefcnt( re ) > 1 ) {
364                     count++;
365                 }
366             }
367         }
368     }
369     rq->rq_unlock( rq );
370     return count;
371 }
372
373
374 /* 
375  * Allocate and initialize an Rq object.
376  */
377 int
378 Rq_init(
379     Rq  **rq
380 )
381 {
382     /* Instantiate the struct */
383     (*rq) = (Rq *) malloc( sizeof( Rq ));
384     if ( *rq == NULL ) {
385         return -1;
386     }
387
388     /* Fill in all the function pointers */
389     (*rq)->rq_gethead = Rq_gethead;
390     (*rq)->rq_getnext = Rq_getnext;
391     (*rq)->rq_delhead = Rq_delhead;
392     (*rq)->rq_add = Rq_add;
393     (*rq)->rq_gc = Rq_gc;
394     (*rq)->rq_lock = Rq_lock;
395     (*rq)->rq_unlock = Rq_unlock;
396     (*rq)->rq_dump = Rq_dump;
397     (*rq)->rq_needtrim = Rq_needtrim;
398     (*rq)->rq_write = Rq_write;
399     (*rq)->rq_getcount = Rq_getcount;
400
401     /* Initialize private data */
402     pthread_mutex_init( &((*rq)->rq_mutex), pthread_mutexattr_default );
403     pthread_cond_init( &((*rq)->rq_more), pthread_condattr_default );
404     (*rq)->rq_head = NULL;
405     (*rq)->rq_tail = NULL;
406     (*rq)->rq_nre = 0;
407     (*rq)->rq_ndel = 0;
408     (*rq)->rq_lasttrim = (time_t) 0L;
409
410     return 0;
411 }
412