]> git.sur5r.net Git - openldap/blob - servers/slurpd/rq.c
Coverted LDAP_LOG macro to use subsystem ID int values instead of string values
[openldap] / servers / slurpd / rq.c
1 /* $OpenLDAP$ */
2 /*
3  * Copyright 1998-2002 The OpenLDAP Foundation, All Rights Reserved.
4  * COPYING RESTRICTIONS APPLY, see COPYRIGHT file
5  */
6 /*
7  * Copyright (c) 1996 Regents of the University of Michigan.
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms are permitted
11  * provided that this notice is preserved and that due credit is given
12  * to the University of Michigan at Ann Arbor. The name of the University
13  * may not be used to endorse or promote products derived from this
14  * software without specific prior written permission. This software
15  * is provided ``as is'' without express or implied warranty.
16  */
17
18 /*
19  * rq.c - routines used to manage the queue of replication entries.
20  * An Rq (Replication queue) struct contains a linked list of Re
21  * (Replication entry) structures.
22  *
23  * Routines wishing to access the replication queue should do so through
24  * the Rq struct's member functions, e.g. rq->rq_gethead() and friends.
25  * For example, Re structs should be added to the queue by calling 
26  * the rq_add() member function.
27  *
28  * Access to the queue is serialized by a mutex.  Member functions which do
29  * not do their own locking should only be called after locking the queue
30  * using the rq_lock() member function.  The queue should be unlocked with
31  * the rq_unlock() member function.
32  *
33  * Note that some member functions handle their own locking internally.
34  * Callers should not lock the queue before calling these functions.
35  * See the comment block for each function below.
36  *
37  */
38
39 #include "portable.h"
40
41 #include <stdio.h>
42 #include <sys/stat.h>
43
44 #include <ac/stdlib.h>
45 #include <ac/string.h>
46 #include <ac/unistd.h>          /* get ftruncate() */
47
48 #ifdef HAVE_SYS_TYPES_H
49 #include <sys/types.h>
50 #endif
51 #ifdef HAVE_FCNTL_H
52 #include <fcntl.h>
53 #endif
54
55 #include "slurp.h"
56 #include "globals.h"
57
58 /*
59  * Lock the replication queue.
60  */
61 static int
62 Rq_lock(
63     Rq  *rq
64 )
65 {
66     return( ldap_pvt_thread_mutex_lock( &rq->rq_mutex ));
67 }
68
69
70 /*
71  * Unlock the replication queue.
72  */
73 static int
74 Rq_unlock(
75     Rq  *rq
76 )
77 {
78     return( ldap_pvt_thread_mutex_unlock( &rq->rq_mutex ));
79 }
80
81
82
83 /*
84  * Return the head of the queue.  Callers should lock the queue before
85  * calling this routine.
86  */
87 static Re *
88 Rq_gethead(
89     Rq  *rq
90 )
91 {
92     return( rq == NULL ? NULL : rq->rq_head );
93 }
94
95
96 /*
97  * Return the next item in the queue.  Callers should lock the queue before
98  * calling this routine.
99  */
100 static Re *
101 Rq_getnext(
102     Re  *re
103 )
104 {
105     if ( re == NULL ) {
106         return NULL;
107     } else {
108         return( re->re_getnext( re ));
109     }
110 }
111
112
113 /*
114  * Delete the item at the head of the list.  The queue should be locked
115  * by the caller before calling this routine.
116  */
117 static int
118 Rq_delhead(
119     Rq  *rq
120 )
121 {
122     Re  *savedhead;
123     int rc;
124
125     if ( rq == NULL ) {
126         return( -1 );
127     }
128
129     savedhead = rq->rq_head;
130     if ( savedhead == NULL ) {
131         return( 0 );
132     }
133
134     if ( savedhead->re_getrefcnt( savedhead ) != 0 ) {
135 #ifdef NEW_LOGGING
136         LDAP_LOG ( SLURPD, WARNING, "Rq_delhead: "
137                 "Warning: attempt to delete when refcnt != 0\n", 0, 0, 0 );
138 #else
139         Debug( LDAP_DEBUG_ANY, "Warning: attempt to delete when refcnt != 0\n",
140                 0, 0, 0 );
141 #endif
142         return( -1 );
143     }
144
145     rq->rq_head = rq->rq_head->re_getnext( rq->rq_head );
146     rc = savedhead->re_free( savedhead );
147     rq->rq_nre--;       /* decrement count of Re's in queue */
148     return( rc );
149 }
150
151
152 /* 
153  * Add an entry to the tail of the replication queue.  Locking is handled
154  * internally.  When items are added to the queue, this routine wakes
155  * up any threads which are waiting for more work by signaling on the
156  * rq->rq_more condition variable.
157  */
158 static int
159 Rq_add(
160     Rq          *rq,
161     char        *buf
162 )
163 {
164     Re  *re;
165     int wasempty = 0;
166
167     /* Lock the queue */
168     rq->rq_lock( rq );
169
170     /* Create a new Re */
171     if ( Re_init( &re ) < 0 ) {
172         rq->rq_unlock( rq );
173         return -1;
174     }
175
176     /* parse buf and fill in the re struct */
177     if ( re->re_parse( re, buf ) < 0 ) {
178         re->re_free( re );
179         rq->rq_unlock( rq );
180         return -1;
181     }
182
183     /* Insert into queue */
184     if ( rq->rq_head == NULL ) {
185         rq->rq_head = re;
186         rq->rq_tail = re;
187         wasempty = 1;
188     } else {
189         rq->rq_tail->re_next = re;
190     }
191
192     /* set the sequence number */
193     re->re_seq = 0;
194     if ( !wasempty && ( rq->rq_tail->re_timestamp == re->re_timestamp )) {
195         /*
196          * Our new re has the same timestamp as the tail's timestamp.
197          * Increment the seq number in the tail and use it as our seq number.
198          */
199         re->re_seq = rq->rq_tail->re_seq + 1;
200     }
201     rq->rq_tail = re;
202
203     /* Increment count of items in queue */
204     rq->rq_nre++;
205     /* wake up any threads waiting for more work */
206     ldap_pvt_thread_cond_broadcast( &rq->rq_more );
207
208     /* ... and unlock the queue */
209     rq->rq_unlock( rq );
210
211     return 0;
212 }
213
214
215 /*
216  * Garbage-collect the replication queue.  Locking is handled internally.
217  */
218 static void
219 Rq_gc(
220     Rq  *rq
221 )
222 {
223     if ( rq == NULL ) {
224 #ifdef NEW_LOGGING
225         LDAP_LOG ( SLURPD, DETAIL1, "Rq_gc: rq is NULL!\n", 0, 0, 0 );
226 #else
227         Debug( LDAP_DEBUG_ANY, "Rq_gc: rq is NULL!\n", 0, 0, 0 );
228 #endif
229         return;
230     }
231     rq->rq_lock( rq ); 
232     while (( rq->rq_head != NULL ) &&
233             ( rq->rq_head->re_getrefcnt( rq->rq_head ) == 0 )) {
234         rq->rq_delhead( rq );
235         rq->rq_ndel++;  /* increment count of deleted entries */
236     }
237     rq->rq_unlock( rq ); 
238     return;
239 }
240
241
242 /*
243  * For debugging: dump the contents of the replication queue to a file.
244  * Locking is handled internally.
245  */
246 static void
247 Rq_dump(
248     Rq  *rq
249 )
250 {
251     Re          *re;
252     FILE        *fp;
253     int         tmpfd;
254
255     if ( rq == NULL ) {
256 #ifdef NEW_LOGGING
257         LDAP_LOG ( SLURPD, ARGS, "Rq_dump: rq is NULL!\n", 0, 0, 0 );
258 #else
259         Debug( LDAP_DEBUG_ANY, "Rq_dump: rq is NULL!\n", 0, 0, 0 );
260 #endif
261         return;
262     }
263
264     if (unlink(SLURPD_DUMPFILE) == -1 && errno != ENOENT) {
265 #ifdef NEW_LOGGING
266         LDAP_LOG ( SLURPD, ERR, "Rq_dump: "
267                 "\"%s\" exists, cannot unlink\n", SLURPD_DUMPFILE, 0, 0 );
268 #else
269         Debug( LDAP_DEBUG_ANY, "Rq_dump: \"%s\" exists, and cannot unlink\n",
270                 SLURPD_DUMPFILE, 0, 0 );
271 #endif
272         return;
273     }
274     if (( tmpfd = open(SLURPD_DUMPFILE, O_CREAT|O_RDWR|O_EXCL, 0600)) == -1) {
275 #ifdef NEW_LOGGING
276         LDAP_LOG ( SLURPD, ERR, "Rq_dump: "
277                 "cannot open \"%s\" for write\n", SLURPD_DUMPFILE, 0, 0 );
278 #else
279         Debug( LDAP_DEBUG_ANY, "Rq_dump: cannot open \"%s\" for write\n",
280                 SLURPD_DUMPFILE, 0, 0 );
281 #endif
282         return;
283     }
284     if (( fp = fdopen( tmpfd, "w" )) == NULL ) {
285 #ifdef NEW_LOGGING
286         LDAP_LOG ( SLURPD, ERR, "Rq_dump: "
287                 "cannot fdopen \"%s\" for write\n", SLURPD_DUMPFILE, 0, 0 );
288 #else
289         Debug( LDAP_DEBUG_ANY, "Rq_dump: cannot fdopen \"%s\" for write\n",
290                 SLURPD_DUMPFILE, 0, 0 );
291 #endif
292         return;
293     }
294
295     rq->rq_lock( rq );
296     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
297         re->re_dump( re, fp );
298     }
299     rq->rq_unlock( rq );
300     fclose( fp );
301     return;
302 }
303
304
305 /*
306  * Write the contents of a replication queue to a file.  Returns zero if
307  * successful, -1 if not.  Handles queue locking internally.  Callers should
308  * provide an open file pointer, which should refer to a locked file.
309  */
310 static int
311 Rq_write(
312     Rq          *rq,
313     FILE        *fp
314 )
315 {
316     Re          *re;
317     time_t      now;
318
319     if ( rq == NULL ) {
320         return -1;
321     }
322
323 #ifdef NEW_LOGGING
324         LDAP_LOG ( SLURPD, ENTRY, "Rq_write: "
325                 "re-write on-disk replication log\n", 0, 0, 0 );
326 #else
327     Debug( LDAP_DEBUG_ARGS, "re-write on-disk replication log\n",
328             0, 0, 0 );
329 #endif
330 #ifndef SEEK_SET
331 #define SEEK_SET 0
332 #endif
333     fseek( fp, 0L, SEEK_SET );  /* Go to beginning of file */
334     rq->rq_lock( rq );
335
336     for ( re = rq->rq_gethead( rq ); re != NULL; re = rq->rq_getnext( re )) {
337         if ( re->re_write( NULL, re, fp ) < 0 ) {
338             fflush( fp );
339             rq->rq_unlock( rq );
340             return -1;
341         }
342     }
343     fflush( fp );
344     sglob->srpos = ftell( fp ); /* update replog file position */
345     /* and truncate to correct len */
346     if ( ftruncate( fileno( fp ), sglob->srpos ) < 0 ) {
347 #ifdef NEW_LOGGING
348         LDAP_LOG ( SLURPD, ERR, "Rq_write: "
349                 "Error truncating replication log: %s\n", sys_errlist[ errno ], 0, 0 );
350 #else
351         Debug( LDAP_DEBUG_ANY, "Error truncating replication log: %s\n",
352                 sys_errlist[ errno ], 0, 0 );
353 #endif
354     }
355     rq->rq_ndel = 0;    /* reset count of deleted re's */
356     time( &now );
357     rq->rq_lasttrim = now;      /* reset last trim time */
358     rq->rq_unlock( rq );
359     return 0;
360 }
361
362
363 /*
364  * Check to see if the private slurpd replication log needs trimming.
365  * The current criteria are:
366  *  - The last trim was more than 5 minutes ago, *and*
367  *  - We've finished with at least 50 replication log entries since the
368  *    last time we re-wrote the replication log.
369  *
370  * Return 1 if replogfile should be trimmed, 0 if not.
371  * Any different policy should be implemented by replacing this function.
372  */
373 static int
374 Rq_needtrim(
375     Rq  *rq
376 )
377 {
378     int         rc = 0;
379     time_t      now;
380
381     if ( rq == NULL ) {
382         return 0;
383     }
384
385     rq->rq_lock( rq );
386
387     time( &now );
388
389     if ( now > ( rq->rq_lasttrim + TRIMCHECK_INTERVAL )) {
390         rc = ( rq->rq_ndel >= 50 );
391     } else {
392         rc = 0;
393     }
394     rq->rq_unlock( rq );
395     return rc;
396 }
397
398
399 /*
400  * Return counts of Re structs in the queue.
401  */
402 static int
403 Rq_getcount(
404     Rq  *rq,
405     int type
406 )
407 {
408     int count = 0;
409     Re  *re;
410
411     if ( rq == NULL ) {
412         return 0;
413     }
414
415     rq->rq_lock( rq );
416     if ( type == RQ_COUNT_ALL ) {
417         count = rq->rq_nre;
418     } else {
419         for ( re = rq->rq_gethead( rq ); re != NULL;
420                 re = rq->rq_getnext( re )) {
421             if ( type == RQ_COUNT_NZRC ) {
422                 if ( re->re_getrefcnt( re ) > 0 ) {
423                     count++;
424                 }
425             }
426         }
427     }
428     rq->rq_unlock( rq );
429     return count;
430 }
431
432
433 /* 
434  * Allocate and initialize an Rq object.
435  */
436 int
437 Rq_init(
438     Rq  **rq
439 )
440 {
441     /* Instantiate the struct */
442     (*rq) = (Rq *) malloc( sizeof( Rq ));
443     if ( *rq == NULL ) {
444         return -1;
445     }
446
447     /* Fill in all the function pointers */
448     (*rq)->rq_gethead = Rq_gethead;
449     (*rq)->rq_getnext = Rq_getnext;
450     (*rq)->rq_delhead = Rq_delhead;
451     (*rq)->rq_add = Rq_add;
452     (*rq)->rq_gc = Rq_gc;
453     (*rq)->rq_lock = Rq_lock;
454     (*rq)->rq_unlock = Rq_unlock;
455     (*rq)->rq_dump = Rq_dump;
456     (*rq)->rq_needtrim = Rq_needtrim;
457     (*rq)->rq_write = Rq_write;
458     (*rq)->rq_getcount = Rq_getcount;
459
460     /* Initialize private data */
461     ldap_pvt_thread_mutex_init( &((*rq)->rq_mutex) );
462     ldap_pvt_thread_cond_init( &((*rq)->rq_more) );
463     (*rq)->rq_head = NULL;
464     (*rq)->rq_tail = NULL;
465     (*rq)->rq_nre = 0;
466     (*rq)->rq_ndel = 0;
467     (*rq)->rq_lasttrim = (time_t) 0L;
468
469     return 0;
470 }