X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Foverlays%2Fsyncprov.c;h=72c9064ba35a9f15a5c40f31cc6d8cc5c5bbe501;hb=fd5ad3ef399dc281e78b06a36162a58a95518c34;hp=04aba3bf9927b881efb8476094b75bf054ee99b9;hpb=edf359795c28b216f697c776e9b7042a0b448685;p=openldap diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 04aba3bf99..72c9064ba3 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -2,7 +2,7 @@ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004-2014 The OpenLDAP Foundation. + * Copyright 2004-2017 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -28,7 +28,9 @@ #include "config.h" #include "ldap_rq.h" +#ifdef LDAP_DEVEL #define CHECK_CSN 1 +#endif /* A modify request on a particular entry */ typedef struct modinst { @@ -69,6 +71,7 @@ typedef struct syncres { /* Record of a persistent search */ typedef struct syncops { struct syncops *s_next; + struct syncprov_info_t *s_si; struct berval s_base; /* ndn of search base */ ID s_eid; /* entryID of search base */ Operation *s_op; /* search op */ @@ -770,6 +773,7 @@ again: static void free_resinfo( syncres *sr ) { syncres **st; + int freeit = 0; ldap_pvt_thread_mutex_lock( &sr->s_info->ri_mutex ); for (st = &sr->s_info->ri_list; *st; st = &(*st)->s_rilist) { if (*st == sr) { @@ -777,8 +781,10 @@ static void free_resinfo( syncres *sr ) break; } } + if ( !sr->s_info->ri_list ) + freeit = 1; ldap_pvt_thread_mutex_unlock( &sr->s_info->ri_mutex ); - if ( !sr->s_info->ri_list ) { + if ( freeit ) { ldap_pvt_thread_mutex_destroy( &sr->s_info->ri_mutex ); if ( sr->s_info->ri_e ) entry_free( sr->s_info->ri_e ); @@ -789,7 +795,7 @@ static void free_resinfo( syncres *sr ) } static int -syncprov_free_syncop( syncops *so ) +syncprov_free_syncop( syncops *so, int unlink ) { syncres *sr, *srnext; GroupAssertion *ga, *gnext; @@ -801,6 +807,17 @@ syncprov_free_syncop( syncops *so ) return 0; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + if ( unlink ) { + syncops **sop; + ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); + for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { + if ( *sop == so ) { + *sop = so->s_next; + break; + } + } + ldap_pvt_thread_mutex_unlock( &so->s_si->si_ops_mutex ); + } if ( so->s_flags & PS_IS_DETACHED ) { filter_free( so->s_op->ors_filter ); for ( ga = so->s_op->o_groups; ga; ga=gnext ) { @@ -900,7 +917,6 @@ syncprov_qstart( syncops *so ); static int syncprov_qplay( Operation *op, syncops *so ) { - slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; int rc = 0; @@ -987,7 +1003,7 @@ syncprov_qtask( void *ctx, void *arg ) rc = syncprov_qplay( op, so ); /* decrement use count... */ - syncprov_free_syncop( so ); + syncprov_free_syncop( so, 1 ); return NULL; } @@ -998,7 +1014,7 @@ syncprov_qstart( syncops *so ) { so->s_flags |= PS_TASK_QUEUED; so->s_inuse++; - ldap_pvt_thread_pool_submit( &connection_pool, + ldap_pvt_thread_pool_submit( &connection_pool, syncprov_qtask, so ); } @@ -1114,7 +1130,7 @@ syncprov_drop_psearch( syncops *so, int lock ) if ( lock ) ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); } - return syncprov_free_syncop( so ); + return syncprov_free_syncop( so, 0 ); } static int @@ -1132,15 +1148,14 @@ syncprov_op_abandon( Operation *op, SlapReply *rs ) { slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = on->on_bi.bi_private; - syncops *so, *soprev; + syncops *so, **sop; ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so; - soprev=so, so=so->s_next ) { + for ( sop=&si->si_ops; (so = *sop); sop = &(*sop)->s_next ) { if ( so->s_op->o_connid == op->o_connid && so->s_op->o_msgid == op->orn_msgid ) { so->s_op->o_abandon = 1; - soprev->s_next = so->s_next; + *sop = so->s_next; break; } } @@ -1348,7 +1363,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) * with saveit == TRUE */ snext = ss->s_next; - if ( syncprov_free_syncop( ss ) ) { + if ( syncprov_free_syncop( ss, 0 ) ) { *pss = snext; gonext = 0; } @@ -1393,15 +1408,23 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) for (sm = opc->smatches; sm; sm=snext) { snext = sm->sm_next; - syncprov_free_syncop( sm->sm_op ); + syncprov_free_syncop( sm->sm_op, 1 ); op->o_tmpfree( sm, op->o_tmpmemctx ); } /* Remove op from lock table */ mt = opc->smt; if ( mt ) { + modinst *mi = (modinst *)(opc+1), **m2; ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); - mt->mt_mods = mt->mt_mods->mi_next; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } /* If there are more, promote the next one */ if ( mt->mt_mods ) { ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -1470,6 +1493,7 @@ syncprov_checkpoint( Operation *op, slap_overinst *on ) opm.o_bd->bd_info = on->on_info->oi_orig; opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; opm.o_no_schema_check = 1; + opm.o_opid = -1; opm.o_bd->be_modify( &opm, &rsm ); if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && @@ -1526,7 +1550,7 @@ syncprov_add_slog( Operation *op ) } /* Allocate a record. UUIDs are not NUL-terminated. */ - se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + + se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + op->o_csn.bv_len + 1 ); se->se_next = NULL; se->se_tag = op->o_tag; @@ -1572,7 +1596,7 @@ syncprov_add_slog( Operation *op ) } sl->sl_num++; while ( sl->sl_num > sl->sl_size ) { - int i, j; + int i; se = sl->sl_head; sl->sl_head = se->se_next; for ( i=0; isl_numcsns; i++ ) @@ -1893,10 +1917,13 @@ syncprov_op_response( Operation *op, SlapReply *rs ) } else { ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); } + if ( csn_changed ) + si->si_numops++; goto leave; } - si->si_numops++; + if ( csn_changed ) + si->si_numops++; if ( si->si_chkops || si->si_chktime ) { /* Never checkpoint adding the context entry, * it will deadlock @@ -1953,6 +1980,8 @@ syncprov_op_response( Operation *op, SlapReply *rs ) continue; syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); } + if ( opc->ssres.s_info ) + free_resinfo( &opc->ssres ); break; } } @@ -2088,24 +2117,38 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* See if we're already modifying this entry... */ mtdummy.mt_dn = op->o_req_ndn; +retry: ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); if ( mt ) { ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); if ( mt->mt_mods == NULL ) { /* Cannot reuse this mt, as another thread is about - * to release it in syncprov_op_cleanup. + * to release it in syncprov_op_cleanup. Wait for them + * to finish; our own insert is required to succeed. */ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); - mt = NULL; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); + ldap_pvt_thread_yield(); + goto retry; } } if ( mt ) { - ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); mt->mt_tail->mi_next = mi; mt->mt_tail = mi; + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); /* wait for this op to get to head of list */ while ( mt->mt_mods != mi ) { + modinst *m2; + /* don't wait on other mods from the same thread */ + for ( m2 = mt->mt_mods; m2; m2 = m2->mi_next ) { + if ( m2->mi_op->o_threadctx == op->o_threadctx ) { + break; + } + } + if ( m2 ) + break; + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); /* FIXME: if dynamic config can delete overlays or * databases we'll have to check for cleanup here. @@ -2121,12 +2164,21 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) /* clean up if the caller is giving up */ if ( op->o_abandon ) { - modinst *m2; - for ( m2 = mt->mt_mods; m2 && m2->mi_next != mi; - m2 = m2->mi_next ); - if ( m2 ) { - m2->mi_next = mi->mi_next; - if ( mt->mt_tail == mi ) mt->mt_tail = m2; + modinst **m2; + slap_callback **sc; + for (m2 = &mt->mt_mods; ; m2 = &(*m2)->mi_next) { + if ( *m2 == mi ) { + *m2 = mi->mi_next; + if ( mt->mt_tail == mi ) + mt->mt_tail = ( m2 == &mt->mt_mods ) ? NULL : (modinst *)m2; + break; + } + } + for (sc = &op->o_callback; ; sc = &(*sc)->sc_next) { + if ( *sc == cb ) { + *sc = cb->sc_next; + break; + } } op->o_tmpfree( cb, op->o_tmpmemctx ); ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); @@ -2384,7 +2436,7 @@ syncprov_search_response( Operation *op, SlapReply *rs ) } else { /* It's RefreshAndPersist, transition to Persist phase */ syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? - LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, + LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 1, NULL, 0 ); if ( !BER_BVISNULL( &cookie )) @@ -2451,7 +2503,7 @@ syncprov_op_search( Operation *op, SlapReply *rs ) syncops so = {0}; fbase_cookie fc; opcookie opc; - slap_callback sc; + slap_callback sc = {0}; fc.fss = &so; fc.fbase = 0; @@ -2474,7 +2526,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } sop = ch_malloc( sizeof( syncops )); *sop = so; - ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_rid = srs->sr_state.rid; sop->s_sid = srs->sr_state.sid; /* set refcount=2 to prevent being freed out from under us @@ -2499,12 +2550,21 @@ syncprov_op_search( Operation *op, SlapReply *rs ) ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } + if ( op->o_abandon ) { + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + ch_free( sop ); + return SLAPD_ABANDON; + } + ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_next = si->si_ops; + sop->s_si = si; si->si_ops = sop; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } - /* snapshot the ctxcsn */ + /* snapshot the ctxcsn + * Note: this must not be done before the psearch setup. (ITS#8365) + */ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); numcsns = si->si_numcsns; if ( numcsns ) { @@ -2518,17 +2578,19 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } dirty = si->si_dirty; ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); - + /* If we have a cookie, handle the PRESENT lookups */ if ( srs->sr_state.ctxcsn ) { sessionlog *sl; int i, j; - /* If we don't have any CSN of our own yet, pretend nothing - * has changed. + /* If we don't have any CSN of our own yet, bail out. */ - if ( !numcsns ) - goto no_change; + if ( !numcsns ) { + rs->sr_err = LDAP_UNWILLING_TO_PERFORM; + rs->sr_text = "consumer has state info but provider doesn't!"; + goto bailout; + } if ( !si->si_nopres ) do_present = SS_PRESENT; @@ -2594,6 +2656,10 @@ syncprov_op_search( Operation *op, SlapReply *rs ) /* our state is older, complain to consumer */ rs->sr_err = LDAP_UNWILLING_TO_PERFORM; rs->sr_text = "consumer state is newer than provider!"; + Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, + "consumer %d state %s is newer than provider %d state %s\n", + sids[i], srs->sr_state.ctxcsn[i].bv_val, sids[j], /* == slap_serverID */ + ctxcsn[j].bv_val); bailout: if ( sop ) { syncops **sp = &si->si_ops; @@ -2609,7 +2675,7 @@ bailout: send_ldap_result( op, rs ); return rs->sr_err; } - } + } if ( BER_BVISEMPTY( &mincsn )) { mincsn = maxcsn; minsid = maxsid; @@ -2618,7 +2684,7 @@ bailout: /* If nothing has changed, shortcut it */ if ( !changed && !dirty ) { do_present = 0; -no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { +no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; ctrls[0] = NULL; @@ -2705,6 +2771,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { } } } else { + /* The consumer knows nothing, we know nothing. OK. */ + if (!numcsns) + goto no_change; /* No consumer state, assume something has changed */ changed = SS_CHANGED; } @@ -3109,7 +3178,7 @@ syncprov_db_open( char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ]; struct berval csn; - if ( SLAP_SYNC_SHADOW( op->o_bd )) { + if ( slap_serverID || SLAP_SYNC_SHADOW( op->o_bd )) { /* If we're also a consumer, then don't generate anything. * Wait for our provider to send it to us, or for a local * modify if we have multimaster. @@ -3152,8 +3221,8 @@ syncprov_db_close( ConfigReply *cr ) { - slap_overinst *on = (slap_overinst *) be->bd_info; - syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + slap_overinst *on = (slap_overinst *) be->bd_info; + syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; #ifdef SLAP_CONFIG_DELETE syncops *so, *sonext; #endif /* SLAP_CONFIG_DELETE */ @@ -3192,7 +3261,7 @@ syncprov_db_close( overlay_unregister_control( be, LDAP_CONTROL_SYNC ); #endif /* SLAP_CONFIG_DELETE */ - return 0; + return 0; } static int