]> git.sur5r.net Git - openldap/blob - servers/slurpd/ri.c
Don't defer abandon due to pending
[openldap] / servers / slurpd / ri.c
1 /* $OpenLDAP$ */
2 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
3  *
4  * Copyright 1998-2004 The OpenLDAP Foundation.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted only as authorized by the OpenLDAP
9  * Public License.
10  *
11  * A copy of this license is available in file LICENSE in the
12  * top-level directory of the distribution or, alternatively, at
13  * <http://www.OpenLDAP.org/license.html>.
14  */
15 /* Portions Copyright (c) 1996 Regents of the University of Michigan.
16  * All rights reserved.
17  *
18  * Redistribution and use in source and binary forms are permitted
19  * provided that this notice is preserved and that due credit is given
20  * to the University of Michigan at Ann Arbor. The name of the University
21  * may not be used to endorse or promote products derived from this
22  * software without specific prior written permission. This software
23  * is provided ``as is'' without express or implied warranty.
24  */
25 /* ACKNOWLEDGEMENTS:
26  * This work was originally developed by the University of Michigan
27  * (as part of U-MICH LDAP).
28  */
29
30 /*
31  * ri.c - routines used to manipulate Ri structures.  An Ri (Replica
32  * information) struct contains all information about one replica
33  * instance.  The Ri struct is defined in slurp.h
34  */
35
36
37 #include "portable.h"
38
39 #include <stdio.h>
40
41 #include <ac/stdlib.h>
42 #include <ac/string.h>
43 #include <ac/signal.h>
44
45 #include "slurp.h"
46 #include "globals.h"
47
48
49 /* Forward references */
50 static int ismine LDAP_P(( Ri  *, Re  * ));
51 static int isnew LDAP_P(( Ri  *, Re  * ));
52
53
54 /*
55  * Process any unhandled replication entries in the queue.
56  */
57 static int
58 Ri_process(
59     Ri *ri
60 )
61 {
62     Rq          *rq = sglob->rq;
63     Re          *re = NULL, *new_re = NULL;
64     int         rc ;
65     char        *errmsg;
66     int         errfree;
67
68     (void) SIGNAL( LDAP_SIGUSR1, do_nothing );
69 #ifdef SIGPIPE
70     (void) SIGNAL( SIGPIPE, SIG_IGN );
71 #endif
72     if ( ri == NULL ) {
73 #ifdef NEW_LOGGING
74         LDAP_LOG ( SLURPD, ERR, "Ri_process: "
75                 "Error: ri == NULL!\n", 0, 0, 0 );
76 #else
77         Debug( LDAP_DEBUG_ANY, "Error: Ri_process: ri == NULL!\n", 0, 0, 0 );
78 #endif
79         return -1;
80     }
81
82     /*
83      * Startup code.  See if there's any work to do.  If not, wait on the
84      * rq->rq_more condition variable.
85      */
86     rq->rq_lock( rq );
87     while ( !sglob->slurpd_shutdown &&
88             (( re = rq->rq_gethead( rq )) == NULL )) {
89         /* No work */
90         if ( sglob->one_shot_mode ) {
91             /* give up if in one shot mode */
92             rq->rq_unlock( rq );
93             return 0;
94         }
95         /* wait on condition variable */
96         ldap_pvt_thread_cond_wait( &rq->rq_more, &rq->rq_mutex );
97     }
98
99     /*
100      * When we get here, there's work in the queue, and we have the
101      * queue locked.  re should be pointing to the head of the queue.
102      */
103     rq->rq_unlock( rq );
104     while ( !sglob->slurpd_shutdown ) {
105         if ( re != NULL ) {
106             if ( !ismine( ri, re )) {
107                 /* The Re doesn't list my host:port */
108 #ifdef NEW_LOGGING
109                 LDAP_LOG ( SLURPD, DETAIL1, "Ri_process: "
110                         "Replica %s:%d, skip repl record for %s (not mine)\n",
111                         ri->ri_hostname, ri->ri_port, re->re_dn );
112 #else
113                 Debug( LDAP_DEBUG_TRACE,
114                         "Replica %s:%d, skip repl record for %s (not mine)\n",
115                         ri->ri_hostname, ri->ri_port, re->re_dn );
116 #endif
117             } else if ( !isnew( ri, re )) {
118                 /* This Re is older than my saved status information */
119 #ifdef NEW_LOGGING
120                 LDAP_LOG ( SLURPD, DETAIL1, "Ri_process: "
121                         "Replica %s:%d, skip repl record for %s (old)\n",
122                         ri->ri_hostname, ri->ri_port, re->re_dn );
123 #else
124                 Debug( LDAP_DEBUG_TRACE,
125                         "Replica %s:%d, skip repl record for %s (old)\n",
126                         ri->ri_hostname, ri->ri_port, re->re_dn );
127 #endif
128             } else {
129                 rc = do_ldap( ri, re, &errmsg, &errfree );
130                 switch ( rc ) {
131                 case DO_LDAP_ERR_RETRYABLE:
132                     ldap_pvt_thread_sleep( RETRY_SLEEP_TIME );
133 #ifdef NEW_LOGGING
134                         LDAP_LOG ( SLURPD, DETAIL1, "Ri_process: "
135                                 "Retrying operation for DN %s on replica %s:%d\n",
136                             re->re_dn, ri->ri_hostname, ri->ri_port );
137 #else
138                     Debug( LDAP_DEBUG_ANY,
139                             "Retrying operation for DN %s on replica %s:%d\n",
140                             re->re_dn, ri->ri_hostname, ri->ri_port );
141 #endif
142                     continue;
143                     break;
144                 case DO_LDAP_ERR_FATAL: {
145                     /* Non-retryable error.  Write rejection log. */
146                         int ld_errno = 0;
147                         ldap_get_option(ri->ri_ldp, LDAP_OPT_ERROR_NUMBER, &ld_errno);
148                     write_reject( ri, re, ld_errno, errmsg );
149                     /* Update status ... */
150                     (void) sglob->st->st_update( sglob->st, ri->ri_stel, re );
151                     /* ... and write to disk */
152                     (void) sglob->st->st_write( sglob->st );
153                     } break;
154                 default:
155                     /* LDAP op completed ok - update status... */
156                     (void) sglob->st->st_update( sglob->st, ri->ri_stel, re );
157                     /* ... and write to disk */
158                     (void) sglob->st->st_write( sglob->st );
159                     break;
160                 }
161                 if ( errfree && errmsg ) {
162                     ch_free( errmsg );
163                 }
164             }
165         } else {
166 #ifdef NEW_LOGGING
167                 LDAP_LOG ( SLURPD, ERR, "Ri_process: "
168                         "Error: re is null in Ri_process\n", 0, 0, 0 );
169 #else
170             Debug( LDAP_DEBUG_ANY, "Error: re is null in Ri_process\n",
171                     0, 0, 0 );
172 #endif
173         }
174         rq->rq_lock( rq );
175         while ( !sglob->slurpd_shutdown &&
176                 ((new_re = re->re_getnext( re )) == NULL )) {
177             if ( sglob->one_shot_mode ) {
178                 rq->rq_unlock( rq );
179                 return 0;
180             }
181             /* No work - wait on condition variable */
182             ldap_pvt_thread_cond_wait( &rq->rq_more, &rq->rq_mutex );
183         }
184         re->re_decrefcnt( re );
185         re = new_re;
186         rq->rq_unlock( rq );
187         if ( sglob->slurpd_shutdown ) {
188             if ( ri->ri_ldp ) {
189                 ldap_unbind( ri->ri_ldp );
190                 ri->ri_ldp = NULL;
191             }
192             return 0;
193         }
194     }
195     return 0;
196 }
197
198
199 /*
200  * Wake a replication thread which may be sleeping.
201  * Send it a LDAP_SIGUSR1.
202  */
203 static void
204 Ri_wake(
205     Ri *ri
206
207 {
208     if ( ri == NULL ) {
209         return;
210     }
211     ldap_pvt_thread_kill( ri->ri_tid, LDAP_SIGUSR1 );
212 }
213
214
215
216 /* 
217  * Allocate and initialize an Ri struct.
218  */
219 int
220 Ri_init(
221     Ri  **ri
222 )
223 {
224     (*ri) = ( Ri * ) calloc( 1, sizeof( Ri ));
225     if ( *ri == NULL ) {
226         return -1;
227     }
228
229     /* Initialize member functions */
230     (*ri)->ri_process = Ri_process;
231     (*ri)->ri_wake = Ri_wake;
232
233     /* Initialize private data */
234     (*ri)->ri_hostname = NULL;
235     (*ri)->ri_uri = NULL;
236     (*ri)->ri_ldp = NULL;
237     (*ri)->ri_bind_dn = NULL;
238     (*ri)->ri_password = NULL;
239     (*ri)->ri_authcId = NULL;
240     (*ri)->ri_srvtab = NULL;
241     (*ri)->ri_curr = NULL;
242
243     return 0;
244 }
245
246
247
248
249 /*
250  * Return 1 if the hostname and port in re match the hostname and port
251  * in ri, otherwise return zero.
252  */
253 static int
254 ismine(
255     Ri  *ri,
256     Re  *re
257 )
258 {
259     Rh  *rh;
260     int i;
261
262     if ( ri == NULL || re == NULL || ri->ri_hostname == NULL ||
263             re->re_replicas == NULL ) {
264         return 0;
265     }
266     rh = re->re_replicas;
267     for ( i = 0; rh[ i ].rh_hostname != NULL; i++ ) {
268         if ( !strcmp( rh[ i ].rh_hostname, ri->ri_hostname) &&
269                 rh[ i ].rh_port == ri->ri_port ) {
270             return 1;
271         }
272     }
273     return 0;
274 }
275
276
277
278
279 /*
280  * Return 1 if the Re's timestamp/seq combination are greater than the
281  * timestamp and seq in the Ri's ri_stel member.  In other words, if we
282  * find replication entries in the log which we've already processed,
283  * don't process them.  If the re is "old," return 0.
284  * No check for NULL pointers is done.
285  */
286 static int
287 isnew(
288     Ri  *ri,
289     Re  *re
290 )
291 {
292     long x;
293     int ret;
294
295     /* Lock the St struct to avoid a race */
296     sglob->st->st_lock( sglob->st );
297     x = re->re_timestamp - ri->ri_stel->last;
298     if ( x > 0 ) {
299         /* re timestamp is newer */
300         ret = 1;
301     } else if ( x < 0 ) {
302         ret = 0;
303     } else {
304         /* timestamps were equal */
305         if ( re->re_seq > ri->ri_stel->seq ) {
306             /* re seq is newer */
307             ret = 1;
308         } else {
309             ret = 0;
310         }
311     }
312     sglob->st->st_unlock( sglob->st );
313     return ret;
314 }