From: Howard Chu Date: Sun, 2 Oct 2005 03:58:00 +0000 (+0000) Subject: Always queue psearch responses (ITS#3671 revisited) X-Git-Tag: OPENLDAP_REL_ENG_2_2_MP~335 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=47a055b59b436c5a217099eaaf183416af1cc8a3;p=openldap Always queue psearch responses (ITS#3671 revisited) --- diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 05f56c447d..3d71305dee 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -26,6 +26,7 @@ #include "lutil.h" #include "slap.h" #include "config.h" +#include "ldap_rq.h" /* A modify request on a particular entry */ typedef struct modinst { @@ -63,6 +64,7 @@ typedef struct syncops { int s_inuse; /* reference count */ struct syncres *s_res; struct syncres *s_restail; + void *s_qtask; /* task for playing psearch responses */ ldap_pvt_thread_mutex_t s_mutex; } syncops; @@ -680,90 +682,9 @@ again: return rc; } -/* Queue a persistent search response */ -static int -syncprov_qresp( opcookie *opc, syncops *so, int mode ) -{ - syncres *sr; - - sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 + - opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 ); - sr->s_next = NULL; - sr->s_dn.bv_val = (char *)(sr + 1); - sr->s_dn.bv_len = opc->sdn.bv_len; - sr->s_mode = mode; - sr->s_isreference = opc->sreference; - sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val ); - sr->s_ndn.bv_len = opc->sndn.bv_len; - *(sr->s_ndn.bv_val++) = '\0'; - sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val ); - sr->s_uuid.bv_len = opc->suuid.bv_len; - *(sr->s_uuid.bv_val++) = '\0'; - sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val ); - sr->s_csn.bv_len = opc->sctxcsn.bv_len; - strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val ); - - if ( !so->s_res ) { - so->s_res = sr; - } else { - so->s_restail->s_next = sr; - } - so->s_restail = sr; - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - return LDAP_SUCCESS; -} - -/* Play back queued responses */ -static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue ); - -static int -syncprov_qplay( Operation *op, slap_overinst *on, syncops *so ) -{ - syncres *sr, *srnext; - Entry *e; - opcookie opc; - int rc; - - opc.son = on; - op->o_bd->bd_info = (BackendInfo *)on->on_info; - for (sr = so->s_res; sr; sr=srnext) { - srnext = sr->s_next; - opc.sdn = sr->s_dn; - opc.sndn = sr->s_ndn; - opc.suuid = sr->s_uuid; - opc.sctxcsn = sr->s_csn; - opc.sreference = sr->s_isreference; - e = NULL; - - if ( sr->s_mode != LDAP_SYNC_DELETE ) { - rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); - if ( rc ) { - ch_free( sr ); - so->s_res = srnext; - continue; - } - } - rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 ); - - if ( e ) { - be_entry_release_rw( op, e, 0 ); - } - if ( rc ) - break; - - ch_free( sr ); - so->s_res = srnext; - } - op->o_bd->bd_info = (BackendInfo *)on; - if ( !so->s_res ) - so->s_restail = NULL; - return rc; -} - /* Send a persistent search response */ static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue ) +syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode) { slap_overinst *on = opc->son; @@ -772,56 +693,17 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod struct berval cookie; Entry e_uuid = {0}; Attribute a_uuid = {0}; - Operation sop = *so->s_op; - Opheader ohdr; if ( so->s_op->o_abandon ) return SLAPD_ABANDON; - ohdr = *sop.o_hdr; - sop.o_hdr = &ohdr; - sop.o_tmpmemctx = op->o_tmpmemctx; - sop.o_bd = op->o_bd; - sop.o_controls = op->o_controls; - sop.o_private = op->o_private; - sop.o_callback = NULL; - - /* If queueing is allowed */ - if ( queue ) { - ldap_pvt_thread_mutex_lock( &so->s_mutex ); - /* If we're still in refresh mode, must queue */ - if (so->s_flags & PS_IS_REFRESHING) { - return syncprov_qresp( opc, so, mode ); - } - /* If connection is free but queue is non-empty, - * try to flush the queue. - */ - if ( so->s_res ) { - rs.sr_err = syncprov_qplay( &sop, on, so ); - } - /* If the connection is busy, must queue */ - if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) { - return syncprov_qresp( opc, so, mode ); - } - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - - /* If syncprov_qplay returned any other error, bail out. */ - if ( rs.sr_err ) { - return rs.sr_err; - } - } else { - /* Queueing not allowed and conn is busy, give up */ - if ( sop.o_conn->c_writewaiter ) - return LDAP_BUSY; - } - ctrls[1] = NULL; slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid ); e_uuid.e_attrs = &a_uuid; a_uuid.a_desc = slap_schema.si_ad_entryUUID; a_uuid.a_nvals = &opc->suuid; - rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid, + rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, mode, ctrls, 0, 1, &cookie ); rs.sr_ctrls = ctrls; @@ -832,8 +714,8 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod if ( rs.sr_entry->e_private ) rs.sr_flags = REP_ENTRY_MUSTRELEASE; if ( opc->sreference ) { - rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry ); - send_search_reference( &sop, &rs ); + rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); + send_search_reference( op, &rs ); ber_bvarray_free( rs.sr_ref ); if ( !rs.sr_entry ) *e = NULL; @@ -844,8 +726,8 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod rs.sr_entry = *e; if ( rs.sr_entry->e_private ) rs.sr_flags = REP_ENTRY_MUSTRELEASE; - rs.sr_attrs = sop.ors_attrs; - send_search_entry( &sop, &rs ); + rs.sr_attrs = op->ors_attrs; + send_search_entry( op, &rs ); if ( !rs.sr_entry ) *e = NULL; break; @@ -857,30 +739,159 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mod if ( opc->sreference ) { struct berval bv = BER_BVNULL; rs.sr_ref = &bv; - send_search_reference( &sop, &rs ); + send_search_reference( op, &rs ); } else { - send_search_entry( &sop, &rs ); + send_search_entry( op, &rs ); } break; default: assert(0); } op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx ); - op->o_private = sop.o_private; rs.sr_ctrls = NULL; - /* Check queue again here; if we were hanging in a send and eventually - * recovered, there may be more to send now. But don't check if the - * original psearch has been abandoned. - */ - if ( so->s_op->o_abandon ) - return SLAPD_ABANDON; - if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) { + return rs.sr_err; +} + +/* Play back queued responses */ +static int +syncprov_qplay( Operation *op, slap_overinst *on, syncops *so ) +{ + syncres *sr; + Entry *e; + opcookie opc; + int rc; + + opc.son = on; + op->o_bd->bd_info = (BackendInfo *)on->on_info; + + for (;;) { ldap_pvt_thread_mutex_lock( &so->s_mutex ); - rs.sr_err = syncprov_qplay( &sop, on, so ); + sr = so->s_res; + if ( sr ) + so->s_res = sr->s_next; + if ( !so->s_res ) + so->s_restail = NULL; ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + + if ( !sr ) + break; + + if ( !so->s_op->o_abandon ) { + opc.sdn = sr->s_dn; + opc.sndn = sr->s_ndn; + opc.suuid = sr->s_uuid; + opc.sctxcsn = sr->s_csn; + opc.sreference = sr->s_isreference; + e = NULL; + + if ( sr->s_mode != LDAP_SYNC_DELETE ) { + rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e ); + if ( rc ) { + ch_free( sr ); + continue; + } + } + rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode ); + + if ( e ) { + be_entry_release_rw( op, e, 0 ); + } + if ( rc ) + break; + } + + ch_free( sr ); } - return rs.sr_err; + op->o_bd->bd_info = (BackendInfo *)on; + return rc; +} + +/* runqueue task for playing back queued responses */ +static void * +syncprov_qtask( void *ctx, void *arg ) +{ + struct re_s *rtask = arg; + syncops *so = rtask->arg; + slap_overinst *on = so->s_op->o_private; + char opbuf[OPERATION_BUFFER_SIZE]; + Operation *op; + BackendDB be; + + op = (Operation *)opbuf; + memset( op, 0, sizeof(opbuf)); + op->o_hdr = (Opheader *)(op+1); + op->o_controls = (void **)(op->o_hdr+1); + + *op->o_hdr = *so->s_op->o_hdr; + + op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx); + op->o_tmpmfuncs = &slap_sl_mfuncs; + op->o_threadctx = ctx; + + /* syncprov_qplay expects a fake db */ + be = *so->s_op->o_bd; + be.be_flags |= SLAP_DBFLAG_OVERLAY; + op->o_bd = &be; + op->o_private = NULL; + op->o_callback = NULL; + + syncprov_qplay( op, on, so ); + + /* wait until we get explicitly scheduled again */ + ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); + ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask ); + ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 ); + ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); + + return NULL; +} + +/* Queue a persistent search response */ +static int +syncprov_qresp( opcookie *opc, syncops *so, int mode ) +{ + syncres *sr; + + ldap_pvt_thread_mutex_lock( &so->s_mutex ); + sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 + + opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 ); + sr->s_next = NULL; + sr->s_dn.bv_val = (char *)(sr + 1); + sr->s_dn.bv_len = opc->sdn.bv_len; + sr->s_mode = mode; + sr->s_isreference = opc->sreference; + sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val ); + sr->s_ndn.bv_len = opc->sndn.bv_len; + *(sr->s_ndn.bv_val++) = '\0'; + sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val ); + sr->s_uuid.bv_len = opc->suuid.bv_len; + *(sr->s_uuid.bv_val++) = '\0'; + sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val ); + sr->s_csn.bv_len = opc->sctxcsn.bv_len; + strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val ); + + if ( !so->s_res ) { + so->s_res = sr; + } else { + so->s_restail->s_next = sr; + } + so->s_restail = sr; + if ( so->s_flags & PS_IS_DETACHED ) { + ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex ); + if ( !so->s_qtask ) { + so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, 0, + syncprov_qtask, so, "syncprov_qtask", + so->s_op->o_conn->c_peer_name.bv_val ); + } else { + if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask )) { + ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 ); + } + } + ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex ); + } + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + return LDAP_SUCCESS; } static void @@ -1019,12 +1030,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) e = op->ora_e; } - if ( saveit ) { + if ( saveit || op->o_tag == LDAP_REQ_ADD ) { ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx ); ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx ); opc->sreference = is_entry_referral( e ); - } - if ( saveit || op->o_tag == LDAP_REQ_ADD ) { a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID ); if ( a ) ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx ); @@ -1082,15 +1091,15 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) /* if found send UPDATE else send ADD */ ss->s_inuse++; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); - syncprov_sendresp( op, opc, ss, &e, - found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 ); + syncprov_qresp( opc, ss, + found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD ); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); ss->s_inuse--; } } else if ( !saveit && found ) { /* send DELETE */ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); - syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 ); + syncprov_qresp( opc, ss, LDAP_SYNC_DELETE ); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } } @@ -1444,8 +1453,7 @@ syncprov_op_response( Operation *op, SlapReply *rs ) for ( sm = opc->smatches; sm; sm=sm->sm_next ) { if ( sm->sm_op->s_op->o_abandon ) continue; - syncprov_sendresp( op, opc, sm->sm_op, NULL, - LDAP_SYNC_DELETE, 1 ); + syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); } break; } @@ -1633,7 +1641,7 @@ syncprov_search_cleanup( Operation *op, SlapReply *rs ) } static void -syncprov_detach_op( Operation *op, syncops *so ) +syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on ) { Operation *op2; int i, alen = 0; @@ -1659,8 +1667,9 @@ syncprov_detach_op( Operation *op, syncops *so ) *op2->o_hdr = *op->o_hdr; op2->o_tag = op->o_tag; op2->o_time = op->o_time; - op2->o_bd = op->o_bd; + op2->o_bd = on->on_info->oi_origdb; op2->o_request = op->o_request; + op2->o_private = on; if ( i ) { op2->ors_attrs = (AttributeName *)(op2->o_hdr + 1); @@ -1761,25 +1770,23 @@ syncprov_search_response( Operation *op, SlapReply *rs ) 0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS : LDAP_SYNC_REFRESH_DELETES ); } else { - int locked = 0; /* It's RefreshAndPersist, transition to Persist phase */ syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ? LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, &cookie, 1, NULL, 0 ); /* Flush any queued persist messages */ if ( ss->ss_so->s_res ) { - ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); - locked = 1; syncprov_qplay( op, on, ss->ss_so ); } + /* Detach this Op from frontend control */ + ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex ); + /* Turn off the refreshing flag */ ss->ss_so->s_flags ^= PS_IS_REFRESHING; - if ( locked ) - ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); - /* Detach this Op from frontend control */ - syncprov_detach_op( op, ss->ss_so ); + syncprov_detach_op( op, ss->ss_so, on ); + ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex ); return LDAP_SUCCESS; }