X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Foverlays%2Fsyncprov.c;h=160b57a28990b56f2d8427bb13ca43c54fad3232;hb=0ba50a1d064ef71145fe0a08d813d710634a23b4;hp=5ef2eaae896ba1d1e3db6ea794c042ac40e4903b;hpb=5e028ae83938177473d26ceb6b12d1c0728d9766;p=openldap diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 5ef2eaae89..160b57a289 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -2,7 +2,7 @@ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004-2014 The OpenLDAP Foundation. + * Copyright 2004-2018 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -28,7 +28,9 @@ #include "config.h" #include "ldap_rq.h" +#ifdef LDAP_DEVEL #define CHECK_CSN 1 +#endif /* A modify request on a particular entry */ typedef struct modinst { @@ -43,21 +45,33 @@ typedef struct modtarget { ldap_pvt_thread_mutex_t mt_mutex; } modtarget; +/* All the info of a psearch result that's shared between + * multiple queues + */ +typedef struct resinfo { + struct syncres *ri_list; + Entry *ri_e; + struct berval ri_dn; + struct berval ri_ndn; + struct berval ri_uuid; + struct berval ri_csn; + struct berval ri_cookie; + char ri_isref; + ldap_pvt_thread_mutex_t ri_mutex; +} resinfo; + /* A queued result of a persistent search */ typedef struct syncres { - struct syncres *s_next; - Entry *s_e; - struct berval s_dn; - struct berval s_ndn; - struct berval s_uuid; - struct berval s_csn; + struct syncres *s_next; /* list of results on this psearch queue */ + struct syncres *s_rilist; /* list of psearches using this result */ + resinfo *s_info; char s_mode; - char s_isreference; } syncres; /* Record of a persistent search */ typedef struct syncops { struct syncops *s_next; + struct syncprov_info_t *s_si; struct berval s_base; /* ndn of search base */ ID s_eid; /* entryID of search base */ Operation *s_op; /* search op */ @@ -75,6 +89,7 @@ typedef struct syncops { int s_inuse; /* reference count */ struct syncres *s_res; struct syncres *s_restail; + void *s_pool_cookie; ldap_pvt_thread_mutex_t s_mutex; } syncops; @@ -116,6 +131,7 @@ typedef struct sessionlog { int sl_numcsns; int sl_num; int sl_size; + int sl_playing; slog_entry *sl_head; slog_entry *sl_tail; ldap_pvt_thread_mutex_t sl_mutex; @@ -157,13 +173,9 @@ typedef struct opcookie { short osid; /* sid of op csn */ short rsid; /* sid of relay */ short sreference; /* Is the entry a reference? */ + syncres ssres; } opcookie; -typedef struct mutexint { - ldap_pvt_thread_mutex_t mi_mutex; - int mi_int; -} mutexint; - typedef struct fbase_cookie { struct berval *fdn; /* DN of a modified entry, for scope testing */ syncops *fss; /* persistent search we're testing against */ @@ -760,49 +772,59 @@ again: return rc; } -/* Should find a place to cache these */ -static mutexint *get_mutexint() +static void free_resinfo( syncres *sr ) { - mutexint *mi = ch_malloc( sizeof( mutexint )); - ldap_pvt_thread_mutex_init( &mi->mi_mutex ); - mi->mi_int = 1; - return mi; -} - -static void inc_mutexint( mutexint *mi ) -{ - ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); - mi->mi_int++; - ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); + syncres **st; + int freeit = 0; + ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex ); + for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) { + if (*st == sr) { + *st = sr->s_rilist; + break; + } + } + if ( !sr->s_info->ri_list ) + freeit = 1; + ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex ); + if ( freeit ) { + ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex ); + if ( sr->s_info->ri_e ) + entry_free( sr->s_info->ri_e ); + if ( !BER_BVISNULL( &sr->s_info->ri_cookie )) + ch_free( sr->s_info->ri_cookie.bv_val ); + ch_free( sr->s_info ); + } } -/* return resulting counter */ -static int dec_mutexint( mutexint *mi ) -{ - int i; - ldap_pvt_thread_mutex_lock( &mi->mi_mutex ); - i = --mi->mi_int; - ldap_pvt_thread_mutex_unlock( &mi->mi_mutex ); - if ( !i ) { - ldap_pvt_thread_mutex_destroy( &mi->mi_mutex ); - ch_free( mi ); - } - return i; -} +#define FS_UNLINK 1 +#define FS_LOCK 2 -static void -syncprov_free_syncop( syncops *so ) +static int +syncprov_free_syncop( syncops *so, int flags ) { syncres *sr, *srnext; GroupAssertion *ga, *gnext; - ldap_pvt_thread_mutex_lock( &so->s_mutex ); + if ( flags & FS_LOCK ) + ldap_pvt_thread_mutex_lock( &so->s_mutex ); /* already being freed, or still in use */ if ( !so->s_inuse || --so->s_inuse > 0 ) { - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - return; + if ( flags & FS_LOCK ) + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + return 0; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + if (( flags & FS_UNLINK ) && so->s_si ) { + syncops **sop; + ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); + for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { + if ( *sop == so ) { + *sop = so->s_next; + break; + } + } + ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex ); + } if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); for ( ga = so->s_op->o_groups; ga; ga=gnext ) { @@ -814,21 +836,17 @@ syncprov_free_syncop( syncops *so ) ch_free( so->s_base.bv_val ); for ( sr=so->s_res; sr; sr=srnext ) { srnext = sr->s_next; - if ( sr->s_e ) { - if ( !dec_mutexint( sr->s_e->e_private )) { - sr->s_e->e_private = NULL; - entry_free( sr->s_e ); - } - } + free_resinfo( sr ); ch_free( sr ); } ldap_pvt_thread_mutex_destroy( &so->s_mutex ); ch_free( so ); + return 1; } /* Send a persistent search response */ static int -syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) +syncprov_sendresp( Operation *op, resinfo *ri, syncops *so, int mode ) { SlapReply rs = { REP_SEARCH }; struct berval cookie, csns[2]; @@ -841,7 +859,7 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) rs.sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2, op->o_tmpmemctx ); rs.sr_ctrls[1] = NULL; rs.sr_flags = REP_CTRLS_MUSTBEFREED; - csns[0] = opc->sctxcsn; + csns[0] = ri->ri_csn; BER_BVZERO( &csns[1] ); slap_compose_sync_cookie( op, &cookie, csns, so->s_rid, slap_serverID ? slap_serverID : -1 ); @@ -857,20 +875,20 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) e_uuid.e_attrs = &a_uuid; a_uuid.a_desc = slap_schema.si_ad_entryUUID; - a_uuid.a_nvals = &opc->suuid; + a_uuid.a_nvals = &ri->ri_uuid; rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid, mode, rs.sr_ctrls, 0, 1, &cookie ); op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx ); rs.sr_entry = &e_uuid; if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) { - e_uuid = *opc->se; + e_uuid = *ri->ri_e; e_uuid.e_private = NULL; } switch( mode ) { case LDAP_SYNC_ADD: - if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { + if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { rs.sr_ref = get_entry_referrals( op, rs.sr_entry ); rs.sr_err = send_search_reference( op, &rs ); ber_bvarray_free( rs.sr_ref ); @@ -883,9 +901,9 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode ) break; case LDAP_SYNC_DELETE: e_uuid.e_attrs = NULL; - e_uuid.e_name = opc->sdn; - e_uuid.e_nname = opc->sndn; - if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { + e_uuid.e_name = ri->ri_dn; + e_uuid.e_nname = ri->ri_ndn; + if ( ri->ri_isref && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) { struct berval bv = BER_BVNULL; rs.sr_ref = &bv; rs.sr_err = send_search_reference( op, &rs ); @@ -906,13 +924,9 @@ syncprov_qstart( syncops *so ); static int syncprov_qplay( Operation *op, syncops *so ) { - slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; - opcookie opc; int rc = 0; - opc.son = on; - do { ldap_pvt_thread_mutex_lock( &so->s_mutex ); sr = so->s_res; @@ -930,25 +944,13 @@ syncprov_qplay( Operation *op, syncops *so ) SlapReply rs = { REP_INTERMEDIATE }; rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE, - &sr->s_csn, 0, NULL, 0 ); + &sr->s_info->ri_cookie, 0, NULL, 0 ); } else { - opc.sdn = sr->s_dn; - opc.sndn = sr->s_ndn; - opc.suuid = sr->s_uuid; - opc.sctxcsn = sr->s_csn; - opc.sreference = sr->s_isreference; - opc.se = sr->s_e; - - rc = syncprov_sendresp( op, &opc, so, sr->s_mode ); + rc = syncprov_sendresp( op, sr->s_info, so, sr->s_mode ); } } - if ( sr->s_e ) { - if ( !dec_mutexint( sr->s_e->e_private )) { - sr->s_e->e_private = NULL; - entry_free ( sr->s_e ); - } - } + free_resinfo( sr ); ch_free( sr ); if ( so->s_op->o_abandon ) @@ -967,11 +969,8 @@ syncprov_qplay( Operation *op, syncops *so ) if ( rc == 0 && so->s_res ) { syncprov_qstart( so ); - } else { - so->s_flags ^= PS_TASK_QUEUED; } - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return rc; } @@ -1007,8 +1006,17 @@ syncprov_qtask( void *ctx, void *arg ) rc = syncprov_qplay( op, so ); + /* if an error occurred, or no responses left, task is no longer queued */ + if ( !rc && !so->s_res ) + rc = 1; + /* decrement use count... */ - syncprov_free_syncop( so ); + if ( !syncprov_free_syncop( so, FS_UNLINK )) { + if ( rc ) + /* if we didn't unlink, and task is no longer queued, clear flag */ + so->s_flags ^= PS_TASK_QUEUED; + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + } return NULL; } @@ -1019,8 +1027,8 @@ syncprov_qstart( syncops *so ) { so->s_flags |= PS_TASK_QUEUED; so->s_inuse++; - ldap_pvt_thread_pool_submit( &connection_pool, - syncprov_qtask, so ); + ldap_pvt_thread_pool_submit2( &connection_pool, + syncprov_qtask, so, &so->s_pool_cookie ); } /* Queue a persistent search response */ @@ -1028,48 +1036,79 @@ static int syncprov_qresp( opcookie *opc, syncops *so, int mode ) { syncres *sr; + resinfo *ri; int srsize; - struct berval cookie = opc->sctxcsn; + struct berval csn = opc->sctxcsn; - if ( mode == LDAP_SYNC_NEW_COOKIE ) { - syncprov_info_t *si = opc->son->on_bi.bi_private; - - slap_compose_sync_cookie( NULL, &cookie, si->si_ctxcsn, - so->s_rid, slap_serverID ? slap_serverID : -1); - } - - srsize = sizeof(syncres) + opc->suuid.bv_len + 1 + - opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; - if ( cookie.bv_len ) - srsize += cookie.bv_len + 1; - sr = ch_malloc( srsize ); + sr = ch_malloc( sizeof( syncres )); sr->s_next = NULL; - sr->s_e = opc->se; - /* bump refcount on this entry */ - if ( opc->se ) - inc_mutexint( opc->se->e_private ); - sr->s_dn.bv_val = (char *)(sr + 1); - sr->s_dn.bv_len = opc->sdn.bv_len; sr->s_mode = mode; - sr->s_isreference = opc->sreference; - sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, - opc->sdn.bv_val ) + 1; - sr->s_ndn.bv_len = opc->sndn.bv_len; - sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, - opc->sndn.bv_val ) + 1; - sr->s_uuid.bv_len = opc->suuid.bv_len; - AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); - if ( cookie.bv_len ) { - sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1; - strcpy( sr->s_csn.bv_val, cookie.bv_val ); - } else { - sr->s_csn.bv_val = NULL; - } - sr->s_csn.bv_len = cookie.bv_len; + if ( !opc->ssres.s_info ) { + + srsize = sizeof( resinfo ); + if ( csn.bv_len ) + srsize += csn.bv_len + 1; + + if ( opc->se ) { + Attribute *a; + ri = ch_malloc( srsize ); + ri->ri_dn = opc->se->e_name; + ri->ri_ndn = opc->se->e_nname; + a = attr_find( opc->se->e_attrs, slap_schema.si_ad_entryUUID ); + if ( a ) + ri->ri_uuid = a->a_nvals[0]; + else + ri->ri_uuid.bv_len = 0; + if ( csn.bv_len ) { + ri->ri_csn.bv_val = (char *)(ri + 1); + ri->ri_csn.bv_len = csn.bv_len; + memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); + ri->ri_csn.bv_val[csn.bv_len] = '\0'; + } else { + ri->ri_csn.bv_val = NULL; + } + } else { + srsize += opc->suuid.bv_len + + opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1; + ri = ch_malloc( srsize ); + ri->ri_dn.bv_val = (char *)(ri + 1); + ri->ri_dn.bv_len = opc->sdn.bv_len; + ri->ri_ndn.bv_val = lutil_strcopy( ri->ri_dn.bv_val, + opc->sdn.bv_val ) + 1; + ri->ri_ndn.bv_len = opc->sndn.bv_len; + ri->ri_uuid.bv_val = lutil_strcopy( ri->ri_ndn.bv_val, + opc->sndn.bv_val ) + 1; + ri->ri_uuid.bv_len = opc->suuid.bv_len; + AC_MEMCPY( ri->ri_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len ); + if ( csn.bv_len ) { + ri->ri_csn.bv_val = ri->ri_uuid.bv_val + ri->ri_uuid.bv_len; + memcpy( ri->ri_csn.bv_val, csn.bv_val, csn.bv_len ); + ri->ri_csn.bv_val[csn.bv_len] = '\0'; + } else { + ri->ri_csn.bv_val = NULL; + } + } + ri->ri_list = &opc->ssres; + ri->ri_e = opc->se; + ri->ri_csn.bv_len = csn.bv_len; + ri->ri_isref = opc->sreference; + BER_BVZERO( &ri->ri_cookie ); + ldap_pvt_thread_mutex_init( &ri->ri_mutex ); + opc->se = NULL; + opc->ssres.s_info = ri; + } + ri = opc->ssres.s_info; + sr->s_info = ri; + ldap_pvt_thread_mutex_lock( &ri->ri_mutex ); + sr->s_rilist = ri->ri_list; + ri->ri_list = sr; + if ( mode == LDAP_SYNC_NEW_COOKIE && BER_BVISNULL( &ri->ri_cookie )) { + syncprov_info_t *si = opc->son->on_bi.bi_private; - if ( mode == LDAP_SYNC_NEW_COOKIE && cookie.bv_val ) { - ch_free( cookie.bv_val ); + slap_compose_sync_cookie( NULL, &ri->ri_cookie, si->si_ctxcsn, + so->s_rid, slap_serverID ? slap_serverID : -1); } + ldap_pvt_thread_mutex_unlock( &ri->ri_mutex ); ldap_pvt_thread_mutex_lock( &so->s_mutex ); if ( !so->s_res ) { @@ -1104,9 +1143,7 @@ syncprov_drop_psearch( syncops *so, int lock ) if ( lock ) ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); } - syncprov_free_syncop( so ); - - return 0; + return syncprov_free_syncop( so, FS_LOCK ); } static int @@ -1124,15 +1161,14 @@ syncprov_op_abandon( Operation *op, SlapReply *rs ) { slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = on->on_bi.bi_private; - syncops *so, *soprev; + syncops *so, **sop; ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so; - soprev=so, so=so->s_next ) { + for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) { if ( so->s_op->o_connid == op->o_connid && so->s_op->o_msgid == op->orn_msgid ) { so->s_op->o_abandon = 1; - soprev->s_next = so->s_next; + *sop = so->s_next; break; } } @@ -1166,10 +1202,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) syncprov_info_t *si = on->on_bi.bi_private; fbase_cookie fc; - syncops *ss, *sprev, *snext; + syncops **pss; Entry *e = NULL; Attribute *a; - int rc; + int rc, gonext; struct berval newdn; int freefdn = 0; BackendDB *b0 = op->o_bd, db; @@ -1192,10 +1228,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on ); /* If we're sending responses now, make a copy and unlock the DB */ if ( e && !saveit ) { - if ( !opc->se ) { + if ( !opc->se ) opc->se = entry_dup( e ); - opc->se->e_private = get_mutexint(); - } overlay_entry_release_ov( op, e, 0, on ); e = opc->se; } @@ -1206,10 +1240,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) } else { e = op->ora_e; if ( !saveit ) { - if ( !opc->se ) { + if ( !opc->se ) opc->se = entry_dup( e ); - opc->se->e_private = get_mutexint(); - } e = opc->se; } } @@ -1229,15 +1261,15 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) } ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss; - sprev = ss, ss=snext) + for (pss = &si->si_ops; *pss; pss = gonext ? &(*pss)->s_next : pss) { Operation op2; Opheader oh; syncmatches *sm; int found = 0; + syncops *snext, *ss = *pss; - snext = ss->s_next; + gonext = 1; if ( ss->s_op->o_abandon ) continue; @@ -1266,9 +1298,10 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) SlapReply rs = {REP_RESULT}; send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED, "search base has changed" ); - sprev->s_next = snext; - syncprov_drop_psearch( ss, 1 ); - ss = sprev; + snext = ss->s_next; + if ( syncprov_drop_psearch( ss, 1 ) ) + *pss = snext; + gonext = 0; continue; } @@ -1310,8 +1343,8 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) phase otherwise (ITS#6555) */ op2.ors_filter = ss->s_op->ors_filter->f_and->f_next; } - ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); rc = test_filter( &op2, e, op2.ors_filter ); + ldap_pvt_thread_mutex_unlock( &ss->s_mutex ); } Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n", @@ -1342,7 +1375,11 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) /* Decrement s_inuse, was incremented when called * with saveit == TRUE */ - syncprov_free_syncop( ss ); + snext = ss->s_next; + if ( syncprov_free_syncop( ss, FS_LOCK ) ) { + *pss = snext; + gonext = 0; + } } } ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); @@ -1355,12 +1392,11 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) overlay_entry_release_ov( op, e, 0, on ); op->o_bd = b0; } - if ( opc->se && !saveit ) { - if ( !dec_mutexint( opc->se->e_private )) { - opc->se->e_private = NULL; + if ( !saveit ) { + if ( opc->ssres.s_info ) + free_resinfo( &opc->ssres ); + else if ( opc->se ) entry_free( opc->se ); - opc->se = NULL; - } } if ( freefdn ) { op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx ); @@ -1385,15 +1421,23 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) for (sm = opc->smatches; sm; sm=snext) { snext = sm->sm_next; - syncprov_free_syncop( sm->sm_op ); + syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK ); op->o_tmpfree( sm, op->o_tmpmemctx ); } /* Remove op from lock table */ mt = opc->smt; if ( mt ) { + modinst *mi = (modinst *)(opc+1), **m2; ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); - mt->mt_mods = mt->mt_mods->mi_next; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } /* If there are more, promote the next one */ if ( mt->mt_mods ) { ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -1462,6 +1506,8 @@ syncprov_checkpoint( Operation *op, slap_overinst *on ) opm.o_bd->bd_info = on->on_info->oi_orig; opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; opm.o_no_schema_check = 1; + opm.o_dont_replicate = 1; + opm.o_opid = -1; opm.o_bd->be_modify( &opm, &rsm ); if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && @@ -1507,18 +1553,21 @@ syncprov_add_slog( Operation *op ) * wipe out anything in the log if we see them. */ ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + /* can only do this if no one else is reading the log at the moment */ + if (!sl->sl_playing) { while ( se = sl->sl_head ) { sl->sl_head = se->se_next; ch_free( se ); } sl->sl_tail = NULL; sl->sl_num = 0; + } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); return; } /* Allocate a record. UUIDs are not NUL-terminated. */ - se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + + se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + op->o_csn.bv_len + 1 ); se->se_next = NULL; se->se_tag = op->o_tag; @@ -1563,8 +1612,9 @@ syncprov_add_slog( Operation *op ) } } sl->sl_num++; + if (!sl->sl_playing) { while ( sl->sl_num > sl->sl_size ) { - int i, j; + int i; se = sl->sl_head; sl->sl_head = se->se_next; for ( i=0; isl_numcsns; i++ ) @@ -1579,6 +1629,7 @@ syncprov_add_slog( Operation *op ) ch_free( se ); sl->sl_num--; } + } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); } } @@ -1613,6 +1664,8 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, num = sl->sl_num; i = 0; nmods = 0; + sl->sl_playing++; + ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + num * UUID_LEN, op->o_tmpmemctx ); @@ -1669,6 +1722,8 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); uuids[j].bv_len = UUID_LEN; } + ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + sl->sl_playing--; ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); ndel = i; @@ -1767,6 +1822,55 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, op->o_tmpfree( uuids, op->o_tmpmemctx ); } +static int +syncprov_new_ctxcsn( opcookie *opc, syncprov_info_t *si, int csn_changed, int numvals, BerVarray vals ) +{ + unsigned i; + int j, sid; + + for ( i=0; isi_numcsns; j++ ) { + if ( sid < si->si_sids[j] ) + break; + if ( sid == si->si_sids[j] ) { + if ( ber_bvcmp( &vals[i], &si->si_ctxcsn[j] ) > 0 ) { + ber_bvreplace( &si->si_ctxcsn[j], &vals[i] ); + csn_changed = 1; + } + break; + } + } + + if ( j == si->si_numcsns || sid != si->si_sids[j] ) { + slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, + j, sid, &vals[i] ); + csn_changed = 1; + } + } + if ( csn_changed ) + si->si_dirty = 0; + ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); + + if ( csn_changed ) { + syncops *ss; + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); + for ( ss = si->si_ops; ss; ss = ss->s_next ) { + if ( ss->s_op->o_abandon ) + continue; + /* Send the updated csn to all syncrepl consumers, + * including the server from which it originated. + * The syncrepl consumer and syncprov provider on + * the originating server may be configured to store + * their csn values in different entries. + */ + syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); + } + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + } + return csn_changed; +} + static int syncprov_op_response( Operation *op, SlapReply *rs ) { @@ -1830,65 +1934,42 @@ syncprov_op_response( Operation *op, SlapReply *rs ) } /* Don't do any processing for consumer contextCSN updates */ - if ( op->o_dont_replicate ) { - if ( op->o_tag == LDAP_REQ_MODIFY && - op->orm_modlist->sml_op == LDAP_MOD_REPLACE && - op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { + if ( SLAPD_SYNC_IS_SYNCCONN( op->o_connid ) && + op->o_tag == LDAP_REQ_MODIFY && + op->orm_modlist && + op->orm_modlist->sml_op == LDAP_MOD_REPLACE && + op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { /* Catch contextCSN updates from syncrepl. We have to look at * all the attribute values, as there may be more than one csn * that changed, and only one can be passed in the csn queue. */ - Modifications *mod = op->orm_modlist; - unsigned i; - int j, sid; - - for ( i=0; isml_numvals; i++ ) { - sid = slap_parse_csn_sid( &mod->sml_values[i] ); - for ( j=0; jsi_numcsns; j++ ) { - if ( sid < si->si_sids[j] ) - break; - if ( sid == si->si_sids[j] ) { - if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) { - ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] ); - csn_changed = 1; - } - break; - } - } - - if ( j == si->si_numcsns || sid != si->si_sids[j] ) { - slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, - j, sid, &mod->sml_values[i] ); - csn_changed = 1; - } - } + csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, + op->orm_modlist->sml_numvals, op->orm_modlist->sml_values ); + if ( csn_changed ) + si->si_numops++; + goto leave; + } + if ( op->o_dont_replicate ) { if ( csn_changed ) - si->si_dirty = 0; + si->si_numops++; ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); + goto leave; + } - if ( csn_changed ) { - syncops *ss; - ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for ( ss = si->si_ops; ss; ss = ss->s_next ) { - if ( ss->s_op->o_abandon ) - continue; - /* Send the updated csn to all syncrepl consumers, - * including the server from which it originated. - * The syncrepl consumer and syncprov provider on - * the originating server may be configured to store - * their csn values in different entries. - */ - syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); - } - ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + /* If we're adding the context entry, parse all of its contextCSNs */ + if ( op->o_tag == LDAP_REQ_ADD && + dn_match( &op->o_req_ndn, &si->si_contextdn )) { + Attribute *a = attr_find( op->ora_e->e_attrs, slap_schema.si_ad_contextCSN ); + if ( a ) { + csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, a->a_numvals, a->a_vals ); + if ( csn_changed ) + si->si_numops++; + goto added; } - } else { - ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); - } - goto leave; } - si->si_numops++; + if ( csn_changed ) + si->si_numops++; if ( si->si_chkops || si->si_chktime ) { /* Never checkpoint adding the context entry, * it will deadlock @@ -1913,6 +1994,7 @@ syncprov_op_response( Operation *op, SlapReply *rs ) si->si_dirty = !csn_changed; ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); +added: if ( do_check ) { ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); syncprov_checkpoint( op, on ); @@ -1945,6 +2027,8 @@ syncprov_op_response( Operation *op, SlapReply *rs ) continue; syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); } + if ( opc->ssres.s_info ) + free_resinfo( &opc->ssres ); break; } } @@ -2080,24 +2164,38 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* See if we're already modifying this entry... */ mtdummy.mt_dn = op->o_req_ndn; +retry: ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); if ( mt ) { ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); if ( mt->mt_mods == NULL ) { /* Cannot reuse this mt, as another thread is about - * to release it in syncprov_op_cleanup. + * to release it in syncprov_op_cleanup. Wait for them + * to finish; our own insert is required to succeed. */ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); - mt = NULL; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); + ldap_pvt_thread_yield(); + goto retry; } } if ( mt ) { - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); mt->mt_tail->mi_next = mi; mt->mt_tail = mi; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); /* wait for this op to get to head of list */ while ( mt->mt_mods != mi ) { + modinst *m2; + /* don't wait on other mods from the same thread */ + for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) { + if ( m2->mi_op->o_threadctx == op->o_threadctx ) { + break; + } + } + if ( m2 ) + break; + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); /* FIXME: if dynamic config can delete overlays or * databases we'll have to check for cleanup here. @@ -2113,12 +2211,21 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* clean up if the caller is giving up */ if ( op->o_abandon ) { - modinst *m2; - for ( m2 = mt->mt_mods; m2 && m2->mi_next != mi; - m2 = m2->mi_next ); - if ( m2 ) { - m2->mi_next = mi->mi_next; - if ( mt->mt_tail == mi ) mt->mt_tail = m2; + modinst **m2; + slap_callback **sc; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } + for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) { + if ( *sc == cb ) { + *sc = cb->sc_next; + break; + } } op->o_tmpfree( cb, op->o_tmpmemctx ); ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -2295,14 +2402,6 @@ syncprov_search_response( Operation *op, SlapReply *rs ) int i, sid; sid = slap_parse_csn_sid( &a->a_nvals[0] ); - /* Don't send changed entries back to the originator */ - if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) { - Debug( LDAP_DEBUG_SYNC, - "Entry %s changed by peer, ignored\n", - rs->sr_entry->e_name.bv_val, 0, 0 ); - return LDAP_SUCCESS; - } - /* If not a persistent search */ if ( !ss->ss_so ) { /* Make sure entry is less than the snapshot'd contextCSN */ @@ -2376,7 +2475,7 @@ syncprov_search_response( Operation *op, SlapReply *rs ) } else { /* It's RefreshAndPersist, transition to Persist phase */ syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? - LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, + LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 1, NULL, 0 ); if ( !BER_BVISNULL( &cookie )) @@ -2443,7 +2542,7 @@ syncprov_op_search( Operation *op, SlapReply *rs ) syncops so = {0}; fbase_cookie fc; opcookie opc; - slap_callback sc; + slap_callback sc = {0}; fc.fss = &so; fc.fbase = 0; @@ -2466,7 +2565,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } sop = ch_malloc( sizeof( syncops )); *sop = so; - ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_rid = srs->sr_state.rid; sop->s_sid = srs->sr_state.sid; /* set refcount=2 to prevent being freed out from under us @@ -2491,12 +2589,21 @@ syncprov_op_search( Operation *op, SlapReply *rs ) ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } + if ( op->o_abandon ) { + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + ch_free( sop ); + return SLAPD_ABANDON; + } + ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_next = si->si_ops; + sop->s_si = si; si->si_ops = sop; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } - /* snapshot the ctxcsn */ + /* snapshot the ctxcsn + * Note: this must not be done before the psearch setup. (ITS#8365) + */ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); numcsns = si->si_numcsns; if ( numcsns ) { @@ -2510,17 +2617,19 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } dirty = si->si_dirty; ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); - + /* If we have a cookie, handle the PRESENT lookups */ if ( srs->sr_state.ctxcsn ) { sessionlog *sl; int i, j; - /* If we don't have any CSN of our own yet, pretend nothing - * has changed. + /* If we don't have any CSN of our own yet, bail out. */ - if ( !numcsns ) - goto no_change; + if ( !numcsns ) { + rs->sr_err = LDAP_UNWILLING_TO_PERFORM; + rs->sr_text = "consumer has state info but provider doesn't!"; + goto bailout; + } if ( !si->si_nopres ) do_present = SS_PRESENT; @@ -2586,6 +2695,10 @@ syncprov_op_search( Operation *op, SlapReply *rs ) /* our state is older, complain to consumer */ rs->sr_err = LDAP_UNWILLING_TO_PERFORM; rs->sr_text = "consumer state is newer than provider!"; + Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, + "consumer %d state %s is newer than provider %d state %s\n", + sids[i], srs->sr_state.ctxcsn[i].bv_val, sids[j], /* == slap_serverID */ + ctxcsn[j].bv_val); bailout: if ( sop ) { syncops **sp = &si->si_ops; @@ -2601,7 +2714,7 @@ bailout: send_ldap_result( op, rs ); return rs->sr_err; } - } + } if ( BER_BVISEMPTY( &mincsn )) { mincsn = maxcsn; minsid = maxsid; @@ -2610,7 +2723,7 @@ bailout: /* If nothing has changed, shortcut it */ if ( !changed && !dirty ) { do_present = 0; -no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { +no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; ctrls[0] = NULL; @@ -2697,6 +2810,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { } } } else { + /* The consumer knows nothing, we know nothing. OK. */ + if (!numcsns) + goto no_change; /* No consumer state, assume something has changed */ changed = SS_CHANGED; } @@ -3101,7 +3217,7 @@ syncprov_db_open( char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; struct berval csn; - if ( SLAP_SYNC_SHADOW( op->o_bd )) { + if ( slap_serverID || SLAP_SYNC_SHADOW( op->o_bd )) { /* If we're also a consumer, then don't generate anything. * Wait for our provider to send it to us, or for a local * modify if we have multimaster. @@ -3144,8 +3260,8 @@ syncprov_db_close( ConfigReply *cr ) { - slap_overinst *on = (slap_overinst *) be->bd_info; - syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + slap_overinst *on = (slap_overinst *) be->bd_info; + syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; #ifdef SLAP_CONFIG_DELETE syncops *so, *sonext; #endif /* SLAP_CONFIG_DELETE */ @@ -3176,7 +3292,10 @@ syncprov_db_close( rs.sr_err = LDAP_UNAVAILABLE; send_ldap_result( so->s_op, &rs ); sonext=so->s_next; - syncprov_drop_psearch( so, 0); + if ( so->s_flags & PS_TASK_QUEUED ) + ldap_pvt_thread_pool_retract( so->s_pool_cookie ); + if ( !syncprov_drop_psearch( so, 0 )) + so->s_si = NULL; } si->si_ops=NULL; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); @@ -3184,7 +3303,7 @@ syncprov_db_close( overlay_unregister_control( be, LDAP_CONTROL_SYNC ); #endif /* SLAP_CONFIG_DELETE */ - return 0; + return 0; } static int