X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Foverlays%2Fsyncprov.c;h=3df59509f5e745cab9cfedc453e0871d50e00711;hb=7c05cb30d7e806da6e79ffb59f9532a05c61d46e;hp=dd8229fcba520ea1c97537361279429e16c25ef5;hpb=778024e013cb19b5ada7816367241e4bc1ecb016;p=openldap diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index dd8229fcba..3df59509f5 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -2,7 +2,7 @@ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004-2013 The OpenLDAP Foundation. + * Copyright 2004-2016 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -45,21 +45,33 @@ typedef struct modtarget { ldap_pvt_thread_mutex_t mt_mutex; } modtarget; +/* All the info of a psearch result that's shared between + * multiple queues + */ +typedef struct resinfo { + struct syncres *ri_list; + Entry *ri_e; + struct berval ri_dn; + struct berval ri_ndn; + struct berval ri_uuid; + struct berval ri_csn; + struct berval ri_cookie; + char ri_isref; + ldap_pvt_thread_mutex_t ri_mutex; +} resinfo; + /* A queued result of a persistent search */ typedef struct syncres { - struct syncres *s_next; - Entry *s_e; - struct berval s_dn; - struct berval s_ndn; - struct berval s_uuid; - struct berval s_csn; + struct syncres *s_next; /* list of results on this psearch queue */ + struct syncres *s_rilist; /* list of psearches using this result */ + resinfo *s_info; char s_mode; - char s_isreference; } syncres; /* Record of a persistent search */ typedef struct syncops { struct syncops *s_next; + struct syncprov_info_t *s_si; struct berval s_base; /* ndn of search base */ ID s_eid; /* entryID of search base */ Operation *s_op; /* search op */ @@ -159,13 +171,9 @@ typedef struct opcookie { short osid; /* sid of op csn */ short rsid; /* sid of relay */ short sreference; /* Is the entry a reference? */ + syncres ssres; } opcookie; -typedef struct mutexint { - ldap_pvt_thread_mutex_t mi_mutex; - int mi_int; -} mutexint; - typedef struct fbase_cookie { struct berval *fdn; /* DN of a modified entry, for scope testing */ syncops *fss; /* persistent search we're testing against */ @@ -444,6 +452,7 @@ syncprov_findbase( Operation *op, fbase_cookie *fc ) fop.o_hdr = op->o_hdr; fop.o_time = op->o_time; fop.o_tincr = op->o_tincr; + fop.o_extra = op->o_extra; cb.sc_response = findbase_cb; cb.sc_private = fc; @@ -451,7 +460,6 @@ syncprov_findbase( Operation *op, fbase_cookie *fc ) fop.o_sync_mode = 0; /* turn off sync mode */ fop.o_managedsait = SLAP_CONTROL_CRITICAL; fop.o_callback = &cb; - LDAP_SLIST_INIT( &fop.o_extra ); fop.o_tag = LDAP_REQ_SEARCH; fop.ors_scope = LDAP_SCOPE_BASE; fop.ors_limit = NULL; @@ -762,38 +770,32 @@ again: return rc; } -/* Should find a place to cache these */ -static mutexint *get_mutexint() -{ - mutexint *mi = ch_malloc( sizeof( mutexint )); - ldap_pvt_thread_mutex_init( &mi->mi_mutex ); - mi->mi_int = 1; - return mi; -} - -static void inc_mutexint( mutexint *mi ) +static void free_resinfo( syncres *sr ) { - ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); - mi->mi_int++; - ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); -} - -/* return resulting counter */ -static int dec_mutexint( mutexint *mi ) -{ - int i; - ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); - i = --mi->mi_int; - ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); - if ( !i ) { - ldap_pvt_thread_mutex_destroy( &mi->mi_mutex ); - ch_free( mi ); - } - return i; + syncres **st; + int freeit = 0; + ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex ); + for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) { + if (*st == sr) { + *st = sr->s_rilist; + break; + } + } + if ( !sr->s_info->ri_list ) + freeit = 1; + ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex ); + if ( freeit ) { + ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex ); + if ( sr->s_info->ri_e ) + entry_free( sr->s_info->ri_e ); + if ( !BER_BVISNULL( &sr->s_info->ri_cookie )) + ch_free( sr->s_info->ri_cookie.bv_val ); + ch_free( sr->s_info ); + } } -static void -syncprov_free_syncop( syncops *so ) +static int +syncprov_free_syncop( syncops *so, int unlink ) { syncres *sr, *srnext; GroupAssertion *ga, *gnext; @@ -802,9 +804,20 @@ syncprov_free_syncop( syncops *so ) /* already being freed, or still in use */ if ( !so->s_inuse || --so->s_inuse > 0 ) { ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - return; + return 0; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + if ( unlink ) { + syncops **sop; + ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); + for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { + if ( *sop == so ) { + *sop = so->s_next; + break; + } + } + ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex ); + } if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); for ( ga = so->s_op->o_groups; ga; ga=gnext ) { @@ -816,21 +829,17 @@ syncprov_free_syncop( syncops *so ) ch_free( so->s_base.bv_val ); for ( sr=so->s_res; sr; sr=srnext ) { srnext = sr->s_next; - if ( sr->s_e ) { - if ( !dec_mutexint( sr->s_e->e_private )) { - sr->s_e->e_private = NULL; - entry_free( sr->s_e ); - } - } + free_resinfo( sr ); ch_free( sr ); } ldap_pvt_thread_mutex_destroy( &so->s_mutex ); ch_free( so ); + return 1; } /* Send a persistent search response */ static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) +syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode ) { SlapReply rs = { REP_SEARCH }; struct berval cookie, csns[2]; @@ -843,7 +852,7 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx ); rs.sr_ctrls[1] = NULL; rs.sr_flags = REP_CTRLS_MUSTBEFREED; - csns[0] = opc->sctxcsn; + csns[0] = ri->ri_csn; BER_BVZERO( &csns[1] ); slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); @@ -859,20 +868,20 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) e_uuid.e_attrs = &a_uuid; a_uuid.a_desc = slap_schema.si_ad_entryUUID; - a_uuid.a_nvals = &opc->suuid; + a_uuid.a_nvals = &ri->ri_uuid; rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, mode, rs.sr_ctrls, 0, 1, &cookie ); op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); rs.sr_entry = &e_uuid; if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { - e_uuid = *opc->se; + e_uuid = *ri->ri_e; e_uuid.e_private = NULL; } switch( mode ) { case LDAP_SYNC_ADD: - if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { + if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); rs.sr_err = send_search_reference( op, &rs ); ber_bvarray_free( rs.sr_ref ); @@ -885,9 +894,9 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) break; case LDAP_SYNC_DELETE: e_uuid.e_attrs = NULL; - e_uuid.e_name = opc->sdn; - e_uuid.e_nname = opc->sndn; - if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { + e_uuid.e_name = ri->ri_dn; + e_uuid.e_nname = ri->ri_ndn; + if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { struct berval bv = BER_BVNULL; rs.sr_ref = &bv; rs.sr_err = send_search_reference( op, &rs ); @@ -910,11 +919,8 @@ syncprov_qplay( Operation *op, syncops *so ) { slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; - opcookie opc; int rc = 0; - opc.son = on; - do { ldap_pvt_thread_mutex_lock( &so->s_mutex ); sr = so->s_res; @@ -932,25 +938,13 @@ syncprov_qplay( Operation *op, syncops *so ) SlapReply rs = { REP_INTERMEDIATE }; rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, - &sr->s_csn, 0, NULL, 0 ); + &sr->s_info->ri_cookie, 0, NULL, 0 ); } else { - opc.sdn = sr->s_dn; - opc.sndn = sr->s_ndn; - opc.suuid = sr->s_uuid; - opc.sctxcsn = sr->s_csn; - opc.sreference = sr->s_isreference; - opc.se = sr->s_e; - - rc = syncprov_sendresp( op, &opc, so, sr->s_mode ); + rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode ); } } - if ( sr->s_e ) { - if ( !dec_mutexint( sr->s_e->e_private )) { - sr->s_e->e_private = NULL; - entry_free ( sr->s_e ); - } - } + free_resinfo( sr ); ch_free( sr ); if ( so->s_op->o_abandon ) @@ -1010,7 +1004,7 @@ syncprov_qtask( void *ctx, void *arg ) rc = syncprov_qplay( op, so ); /* decrement use count... */ - syncprov_free_syncop( so ); + syncprov_free_syncop( so, 1 ); return NULL; } @@ -1030,48 +1024,79 @@ static int syncprov_qresp( opcookie *opc, syncops *so, int mode ) { syncres *sr; + resinfo *ri; int srsize; - struct berval cookie = opc->sctxcsn; - - if ( mode == LDAP_SYNC_NEW_COOKIE ) { - syncprov_info_t *si = opc->son->on_bi.bi_private; - - slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn, - so->s_rid, slap_serverID ? slap_serverID : -1); - } + struct berval csn = opc->sctxcsn; - srsize = sizeof(syncres) + opc->suuid.bv_len + 1 + - opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; - if ( cookie.bv_len ) - srsize += cookie.bv_len + 1; - sr = ch_malloc( srsize ); + sr = ch_malloc( sizeof( syncres )); sr->s_next = NULL; - sr->s_e = opc->se; - /* bump refcount on this entry */ - if ( opc->se ) - inc_mutexint( opc->se->e_private ); - sr->s_dn.bv_val = (char *)(sr + 1); - sr->s_dn.bv_len = opc->sdn.bv_len; sr->s_mode = mode; - sr->s_isreference = opc->sreference; - sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, - opc->sdn.bv_val ) + 1; - sr->s_ndn.bv_len = opc->sndn.bv_len; - sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, - opc->sndn.bv_val ) + 1; - sr->s_uuid.bv_len = opc->suuid.bv_len; - AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); - if ( cookie.bv_len ) { - sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1; - strcpy( sr->s_csn.bv_val, cookie.bv_val ); - } else { - sr->s_csn.bv_val = NULL; - } - sr->s_csn.bv_len = cookie.bv_len; + if ( !opc->ssres.s_info ) { + + srsize = sizeof( resinfo ); + if ( csn.bv_len ) + srsize += csn.bv_len + 1; + + if ( opc->se ) { + Attribute *a; + ri = ch_malloc( srsize ); + ri->ri_dn = opc->se->e_name; + ri->ri_ndn = opc->se->e_nname; + a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID ); + if ( a ) + ri->ri_uuid = a->a_nvals[0]; + else + ri->ri_uuid.bv_len = 0; + if ( csn.bv_len ) { + ri->ri_csn.bv_val = (char *)(ri + 1); + ri->ri_csn.bv_len = csn.bv_len; + memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); + ri->ri_csn.bv_val[csn.bv_len] = '\0'; + } else { + ri->ri_csn.bv_val = NULL; + } + } else { + srsize += opc->suuid.bv_len + + opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; + ri = ch_malloc( srsize ); + ri->ri_dn.bv_val = (char *)(ri + 1); + ri->ri_dn.bv_len = opc->sdn.bv_len; + ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val, + opc->sdn.bv_val ) + 1; + ri->ri_ndn.bv_len = opc->sndn.bv_len; + ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val, + opc->sndn.bv_val ) + 1; + ri->ri_uuid.bv_len = opc->suuid.bv_len; + AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); + if ( csn.bv_len ) { + ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len; + memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); + ri->ri_csn.bv_val[csn.bv_len] = '\0'; + } else { + ri->ri_csn.bv_val = NULL; + } + } + ri->ri_list = &opc->ssres; + ri->ri_e = opc->se; + ri->ri_csn.bv_len = csn.bv_len; + ri->ri_isref = opc->sreference; + BER_BVZERO( &ri->ri_cookie ); + ldap_pvt_thread_mutex_init( &ri->ri_mutex ); + opc->se = NULL; + opc->ssres.s_info = ri; + } + ri = opc->ssres.s_info; + sr->s_info = ri; + ldap_pvt_thread_mutex_lock( &ri->ri_mutex ); + sr->s_rilist = ri->ri_list; + ri->ri_list = sr; + if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) { + syncprov_info_t *si = opc->son->on_bi.bi_private; - if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) { - ch_free( cookie.bv_val ); + slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn, + so->s_rid, slap_serverID ? slap_serverID : -1); } + ldap_pvt_thread_mutex_unlock( &ri->ri_mutex ); ldap_pvt_thread_mutex_lock( &so->s_mutex ); if ( !so->s_res ) { @@ -1106,9 +1131,7 @@ syncprov_drop_psearch( syncops *so, int lock ) if ( lock ) ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); } - syncprov_free_syncop( so ); - - return 0; + return syncprov_free_syncop( so, 0 ); } static int @@ -1116,7 +1139,7 @@ syncprov_ab_cleanup( Operation *op, SlapReply *rs ) { slap_callback *sc = op->o_callback; op->o_callback = sc->sc_next; - syncprov_drop_psearch( op->o_callback->sc_private, 0 ); + syncprov_drop_psearch( sc->sc_private, 0 ); op->o_tmpfree( sc, op->o_tmpmemctx ); return 0; } @@ -1126,15 +1149,14 @@ syncprov_op_abandon( Operation *op, SlapReply *rs ) { slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = on->on_bi.bi_private; - syncops *so, *soprev; + syncops *so, **sop; ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so; - soprev=so, so=so->s_next ) { + for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) { if ( so->s_op->o_connid == op->o_connid && so->s_op->o_msgid == op->orn_msgid ) { so->s_op->o_abandon = 1; - soprev->s_next = so->s_next; + *sop = so->s_next; break; } } @@ -1168,10 +1190,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) syncprov_info_t *si = on->on_bi.bi_private; fbase_cookie fc; - syncops *ss, *sprev, *snext; + syncops **pss; Entry *e = NULL; Attribute *a; - int rc; + int rc, gonext; struct berval newdn; int freefdn = 0; BackendDB *b0 = op->o_bd, db; @@ -1194,10 +1216,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); /* If we're sending responses now, make a copy and unlock the DB */ if ( e && !saveit ) { - if ( !opc->se ) { + if ( !opc->se ) opc->se = entry_dup( e ); - opc->se->e_private = get_mutexint(); - } overlay_entry_release_ov( op, e, 0, on ); e = opc->se; } @@ -1208,10 +1228,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) } else { e = op->ora_e; if ( !saveit ) { - if ( !opc->se ) { + if ( !opc->se ) opc->se = entry_dup( e ); - opc->se->e_private = get_mutexint(); - } e = opc->se; } } @@ -1231,15 +1249,15 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) } ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss; - sprev = ss, ss=snext) + for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss) { Operation op2; Opheader oh; syncmatches *sm; int found = 0; + syncops *snext, *ss = *pss; - snext = ss->s_next; + gonext = 1; if ( ss->s_op->o_abandon ) continue; @@ -1268,9 +1286,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) SlapReply rs = {REP_RESULT}; send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, "search base has changed" ); - sprev->s_next = snext; - syncprov_drop_psearch( ss, 1 ); - ss = sprev; + snext = ss->s_next; + if ( syncprov_drop_psearch( ss, 1 ) ) + *pss = snext; + gonext = 0; continue; } @@ -1312,8 +1331,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) phase otherwise (ITS#6555) */ op2.ors_filter = ss->s_op->ors_filter->f_and->f_next; } - ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); rc = test_filter( &op2, e, op2.ors_filter ); + ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); } Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n", @@ -1344,7 +1363,11 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) /* Decrement s_inuse, was incremented when called * with saveit == TRUE */ - syncprov_free_syncop( ss ); + snext = ss->s_next; + if ( syncprov_free_syncop( ss, 0 ) ) { + *pss = snext; + gonext = 0; + } } } ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); @@ -1357,12 +1380,11 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) overlay_entry_release_ov( op, e, 0, on ); op->o_bd = b0; } - if ( opc->se && !saveit ) { - if ( !dec_mutexint( opc->se->e_private )) { - opc->se->e_private = NULL; + if ( !saveit ) { + if ( opc->ssres.s_info ) + free_resinfo( &opc->ssres ); + else if ( opc->se ) entry_free( opc->se ); - opc->se = NULL; - } } if ( freefdn ) { op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx ); @@ -1387,15 +1409,23 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) for (sm = opc->smatches; sm; sm=snext) { snext = sm->sm_next; - syncprov_free_syncop( sm->sm_op ); + syncprov_free_syncop( sm->sm_op, 1 ); op->o_tmpfree( sm, op->o_tmpmemctx ); } /* Remove op from lock table */ mt = opc->smt; if ( mt ) { + modinst *mi = (modinst *)(opc+1), **m2; ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); - mt->mt_mods = mt->mt_mods->mi_next; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } /* If there are more, promote the next one */ if ( mt->mt_mods ) { ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -1887,10 +1917,13 @@ syncprov_op_response( Operation *op, SlapReply *rs ) } else { ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); } + if ( csn_changed ) + si->si_numops++; goto leave; } - si->si_numops++; + if ( csn_changed ) + si->si_numops++; if ( si->si_chkops || si->si_chktime ) { /* Never checkpoint adding the context entry, * it will deadlock @@ -2082,24 +2115,38 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* See if we're already modifying this entry... */ mtdummy.mt_dn = op->o_req_ndn; +retry: ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); if ( mt ) { ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); if ( mt->mt_mods == NULL ) { /* Cannot reuse this mt, as another thread is about - * to release it in syncprov_op_cleanup. + * to release it in syncprov_op_cleanup. Wait for them + * to finish; our own insert is required to succeed. */ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); - mt = NULL; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); + ldap_pvt_thread_yield(); + goto retry; } } if ( mt ) { - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); mt->mt_tail->mi_next = mi; mt->mt_tail = mi; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); /* wait for this op to get to head of list */ while ( mt->mt_mods != mi ) { + modinst *m2; + /* don't wait on other mods from the same thread */ + for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) { + if ( m2->mi_op->o_threadctx == op->o_threadctx ) { + break; + } + } + if ( m2 ) + break; + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); /* FIXME: if dynamic config can delete overlays or * databases we'll have to check for cleanup here. @@ -2115,12 +2162,21 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* clean up if the caller is giving up */ if ( op->o_abandon ) { - modinst *m2; - for ( m2 = mt->mt_mods; m2 && m2->mi_next != mi; - m2 = m2->mi_next ); - if ( m2 ) { - m2->mi_next = mi->mi_next; - if ( mt->mt_tail == mi ) mt->mt_tail = m2; + modinst **m2; + slap_callback **sc; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } + for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) { + if ( *sc == cb ) { + *sc = cb->sc_next; + break; + } } op->o_tmpfree( cb, op->o_tmpmemctx ); ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -2468,7 +2524,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } sop = ch_malloc( sizeof( syncops )); *sop = so; - ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_rid = srs->sr_state.rid; sop->s_sid = srs->sr_state.sid; /* set refcount=2 to prevent being freed out from under us @@ -2493,12 +2548,21 @@ syncprov_op_search( Operation *op, SlapReply *rs ) ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } + if ( op->o_abandon ) { + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + ch_free( sop ); + return SLAPD_ABANDON; + } + ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_next = si->si_ops; + sop->s_si = si; si->si_ops = sop; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } - /* snapshot the ctxcsn */ + /* snapshot the ctxcsn + * Note: this must not be done before the psearch setup. (ITS#8365) + */ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); numcsns = si->si_numcsns; if ( numcsns ) { @@ -2512,17 +2576,19 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } dirty = si->si_dirty; ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); - + /* If we have a cookie, handle the PRESENT lookups */ if ( srs->sr_state.ctxcsn ) { sessionlog *sl; int i, j; - /* If we don't have any CSN of our own yet, pretend nothing - * has changed. + /* If we don't have any CSN of our own yet, bail out. */ - if ( !numcsns ) - goto no_change; + if ( !numcsns ) { + rs->sr_err = LDAP_UNWILLING_TO_PERFORM; + rs->sr_text = "consumer has state info but provider doesn't!"; + goto bailout; + } if ( !si->si_nopres ) do_present = SS_PRESENT; @@ -2612,7 +2678,7 @@ bailout: /* If nothing has changed, shortcut it */ if ( !changed && !dirty ) { do_present = 0; -no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { +no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; ctrls[0] = NULL; @@ -2699,6 +2765,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { } } } else { + /* The consumer knows nothing, we know nothing. OK. */ + if (!numcsns) + goto no_change; /* No consumer state, assume something has changed */ changed = SS_CHANGED; } @@ -3103,7 +3172,7 @@ syncprov_db_open( char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; struct berval csn; - if ( SLAP_SYNC_SHADOW( op->o_bd )) { + if ( slap_serverID || SLAP_SYNC_SHADOW( op->o_bd )) { /* If we're also a consumer, then don't generate anything. * Wait for our provider to send it to us, or for a local * modify if we have multimaster. @@ -3172,6 +3241,7 @@ syncprov_db_close( #ifdef SLAP_CONFIG_DELETE if ( !slapd_shutdown ) { + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); for ( so=si->si_ops, sonext=so; so; so=sonext ) { SlapReply rs = {REP_RESULT}; rs.sr_err = LDAP_UNAVAILABLE; @@ -3180,6 +3250,7 @@ syncprov_db_close( syncprov_drop_psearch( so, 0); } si->si_ops=NULL; + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } overlay_unregister_control( be, LDAP_CONTROL_SYNC ); #endif /* SLAP_CONFIG_DELETE */