From 8d666c3ee2245555eca39c7592f4447ffa307b7e Mon Sep 17 00:00:00 2001 From: Quanah Gibson-Mount Date: Fri, 13 Mar 2009 20:52:28 +0000 Subject: [PATCH] ITS#5985 --- CHANGES | 1 + servers/slapd/overlays/syncprov.c | 94 +++++++++---------------------- 2 files changed, 28 insertions(+), 67 deletions(-) diff --git a/CHANGES b/CHANGES index 55f828ed5e..31d720de38 100644 --- a/CHANGES +++ b/CHANGES @@ -21,6 +21,7 @@ OpenLDAP 2.4.16 Engineering Fixed slapo-dynlist conversion to cn=config (ITS#6002) Fixed slapo-syncprov newCookie sync messages (ITS#5972) Fixed slapd-syncprov too many MMR messages (ITS#6020) + Fixed slapo-syncprov replica lockout (ITS#5985) Build Environment Cleaned up alloc/free functions for Windows (ITS#6005) Documentation diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 345bcb251c..f567f9d8bc 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -71,12 +71,11 @@ typedef struct syncops { #define PS_WROTE_BASE 0x04 #define PS_FIND_BASE 0x08 #define PS_FIX_FILTER 0x10 +#define PS_TASK_QUEUED 0x20 int s_inuse; /* reference count */ struct syncres *s_res; struct syncres *s_restail; - struct re_s *s_qtask; /* task for playing psearch responses */ -#define RUNQ_INTERVAL 36000 /* a long time */ ldap_pvt_thread_mutex_t s_mutex; } syncops; @@ -748,13 +747,6 @@ syncprov_free_syncop( syncops *so ) ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return; } - if ( so->s_qtask ) { - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - if ( ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) ) - ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); - ldap_pvt_runqueue_remove( &slapd_rq, so->s_qtask ); - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); @@ -861,11 +853,13 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, return rs.sr_err; } +static void +syncprov_qstart( syncops *so ); + /* Play back queued responses */ static int -syncprov_qplay( Operation *op, struct re_s *rtask ) +syncprov_qplay( Operation *op, syncops *so ) { - syncops *so = rtask->arg; slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; Entry *e; @@ -874,7 +868,7 @@ syncprov_qplay( Operation *op, struct re_s *rtask ) opc.son = on; - for (;;) { + do { ldap_pvt_thread_mutex_lock( &so->s_mutex ); sr = so->s_res; if ( sr ) @@ -918,37 +912,31 @@ syncprov_qplay( Operation *op, struct re_s *rtask ) ch_free( sr ); - if ( rc ) { - /* Exit loop with mutex held */ - ldap_pvt_thread_mutex_lock( &so->s_mutex ); - break; - } - } + /* Exit loop with mutex held */ + ldap_pvt_thread_mutex_lock( &so->s_mutex ); - /* wait until we get explicitly scheduled again */ - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - ldap_pvt_runqueue_stoptask( &slapd_rq, rtask ); - if ( rc == 0 ) { - ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 ); - } else { - /* bail out on any error */ - ldap_pvt_runqueue_remove( &slapd_rq, rtask ); + } while (0); + + /* We now only send one change at a time, to prevent one + * psearch from hogging all the CPU. Resubmit this task if + * there are more responses queued and no errors occurred. + */ - /* Prevent duplicate remove */ - if ( so->s_qtask == rtask ) - so->s_qtask = NULL; + if ( rc == 0 && so->s_res ) { + syncprov_qstart( so ); + } else { + so->s_flags ^= PS_TASK_QUEUED; } - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return rc; } -/* runqueue task for playing back queued responses */ +/* task for playing back queued responses */ static void * syncprov_qtask( void *ctx, void *arg ) { - struct re_s *rtask = arg; - syncops *so = rtask->arg; + syncops *so = arg; OperationBuffer opbuf; Operation *op; BackendDB be; @@ -973,22 +961,11 @@ syncprov_qtask( void *ctx, void *arg ) LDAP_SLIST_FIRST(&op->o_extra) = NULL; op->o_callback = NULL; - rc = syncprov_qplay( op, rtask ); + rc = syncprov_qplay( op, so ); /* decrement use count... */ syncprov_free_syncop( so ); -#if 0 /* FIXME: connection_close isn't exported from slapd. - * should it be? - */ - if ( rc ) { - ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex ); - if ( connection_state_closing( op->o_conn )) { - connection_close( op->o_conn ); - } - ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex ); - } -#endif return NULL; } @@ -996,27 +973,10 @@ syncprov_qtask( void *ctx, void *arg ) static void syncprov_qstart( syncops *so ) { - int wake=0; - ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); - if ( !so->s_qtask ) { - so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL, - syncprov_qtask, so, "syncprov_qtask", - so->s_op->o_conn->c_peer_name.bv_val ); - ++so->s_inuse; - wake = 1; - } else { - if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) && - !so->s_qtask->next_sched.tv_sec ) { - so->s_qtask->interval.tv_sec = 0; - ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); - so->s_qtask->interval.tv_sec = RUNQ_INTERVAL; - ++so->s_inuse; - wake = 1; - } - } - ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); - if ( wake ) - slap_wake_listener(); + so->s_flags |= PS_TASK_QUEUED; + so->s_inuse++; + ldap_pvt_thread_pool_submit( &connection_pool, + syncprov_qtask, so ); } /* Queue a persistent search response */ @@ -1076,7 +1036,7 @@ syncprov_qresp( opcookie *opc, syncops *so, int mode ) so->s_flags ^= PS_WROTE_BASE; so->s_flags |= PS_FIND_BASE; } - if ( so->s_flags & PS_IS_DETACHED ) { + if (( so->s_flags & (PS_IS_DETACHED|PS_TASK_QUEUED)) == PS_IS_DETACHED ) { syncprov_qstart( so ); } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); -- 2.39.5