X-Git-Url: https://git.sur5r.net/?a=blobdiff_plain;f=servers%2Fslapd%2Foverlays%2Fsyncprov.c;h=d521ff97794ada722dc250f6e3f5e369f4fff3ec;hb=9069cbe543d079e9d7f55162decb8e732fc32338;hp=69be484165452ff6d7e4b4491188250ab1ef277c;hpb=83734af7cb3d6293fe6d8303a0bf4d484fe7b41e;p=openldap diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 69be484165..d521ff9779 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -2,7 +2,7 @@ /* syncprov.c - syncrepl provider */ /* This work is part of OpenLDAP Software . * - * Copyright 2004-2015 The OpenLDAP Foundation. + * Copyright 2004-2018 The OpenLDAP Foundation. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -28,7 +28,9 @@ #include "config.h" #include "ldap_rq.h" +#ifdef LDAP_DEVEL #define CHECK_CSN 1 +#endif /* A modify request on a particular entry */ typedef struct modinst { @@ -87,6 +89,7 @@ typedef struct syncops { int s_inuse; /* reference count */ struct syncres *s_res; struct syncres *s_restail; + void *s_pool_cookie; ldap_pvt_thread_mutex_t s_mutex; } syncops; @@ -128,6 +131,7 @@ typedef struct sessionlog { int sl_numcsns; int sl_num; int sl_size; + int sl_playing; slog_entry *sl_head; slog_entry *sl_tail; ldap_pvt_thread_mutex_t sl_mutex; @@ -792,20 +796,25 @@ static void free_resinfo( syncres *sr ) } } +#define FS_UNLINK 1 +#define FS_LOCK 2 + static int -syncprov_free_syncop( syncops *so, int unlink ) +syncprov_free_syncop( syncops *so, int flags ) { syncres *sr, *srnext; GroupAssertion *ga, *gnext; - ldap_pvt_thread_mutex_lock( &so->s_mutex ); + if ( flags & FS_LOCK ) + ldap_pvt_thread_mutex_lock( &so->s_mutex ); /* already being freed, or still in use */ if ( !so->s_inuse || --so->s_inuse > 0 ) { - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + if ( flags & FS_LOCK ) + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return 0; } ldap_pvt_thread_mutex_unlock( &so->s_mutex ); - if ( unlink ) { + if (( flags & FS_UNLINK ) && so->s_si ) { syncops **sop; ldap_pvt_thread_mutex_lock( &so->s_si->si_ops_mutex ); for ( sop = &so->s_si->si_ops; *sop; sop = &(*sop)->s_next ) { @@ -915,7 +924,6 @@ syncprov_qstart( syncops *so ); static int syncprov_qplay( Operation *op, syncops *so ) { - slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key; syncres *sr; int rc = 0; @@ -961,11 +969,8 @@ syncprov_qplay( Operation *op, syncops *so ) if ( rc == 0 && so->s_res ) { syncprov_qstart( so ); - } else { - so->s_flags ^= PS_TASK_QUEUED; } - ldap_pvt_thread_mutex_unlock( &so->s_mutex ); return rc; } @@ -1001,8 +1006,17 @@ syncprov_qtask( void *ctx, void *arg ) rc = syncprov_qplay( op, so ); + /* if an error occurred, or no responses left, task is no longer queued */ + if ( !rc && !so->s_res ) + rc = 1; + /* decrement use count... */ - syncprov_free_syncop( so, 1 ); + if ( !syncprov_free_syncop( so, FS_UNLINK )) { + if ( rc ) + /* if we didn't unlink, and task is no longer queued, clear flag */ + so->s_flags ^= PS_TASK_QUEUED; + ldap_pvt_thread_mutex_unlock( &so->s_mutex ); + } return NULL; } @@ -1013,8 +1027,8 @@ syncprov_qstart( syncops *so ) { so->s_flags |= PS_TASK_QUEUED; so->s_inuse++; - ldap_pvt_thread_pool_submit( &connection_pool, - syncprov_qtask, so ); + ldap_pvt_thread_pool_submit2( &connection_pool, + syncprov_qtask, so, &so->s_pool_cookie ); } /* Queue a persistent search response */ @@ -1129,7 +1143,7 @@ syncprov_drop_psearch( syncops *so, int lock ) if ( lock ) ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex ); } - return syncprov_free_syncop( so, 0 ); + return syncprov_free_syncop( so, FS_LOCK ); } static int @@ -1362,7 +1376,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit ) * with saveit == TRUE */ snext = ss->s_next; - if ( syncprov_free_syncop( ss, 0 ) ) { + if ( syncprov_free_syncop( ss, FS_LOCK ) ) { *pss = snext; gonext = 0; } @@ -1407,7 +1421,7 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) for (sm = opc->smatches; sm; sm=snext) { snext = sm->sm_next; - syncprov_free_syncop( sm->sm_op, 1 ); + syncprov_free_syncop( sm->sm_op, FS_LOCK|FS_UNLINK ); op->o_tmpfree( sm, op->o_tmpmemctx ); } @@ -1492,6 +1506,8 @@ syncprov_checkpoint( Operation *op, slap_overinst *on ) opm.o_bd->bd_info = on->on_info->oi_orig; opm.o_managedsait = SLAP_CONTROL_NONCRITICAL; opm.o_no_schema_check = 1; + opm.o_dont_replicate = 1; + opm.o_opid = -1; opm.o_bd->be_modify( &opm, &rsm ); if ( rsm.sr_err == LDAP_NO_SUCH_OBJECT && @@ -1537,18 +1553,21 @@ syncprov_add_slog( Operation *op ) * wipe out anything in the log if we see them. */ ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + /* can only do this if no one else is reading the log at the moment */ + if (!sl->sl_playing) { while ( se = sl->sl_head ) { sl->sl_head = se->se_next; ch_free( se ); } sl->sl_tail = NULL; sl->sl_num = 0; + } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); return; } /* Allocate a record. UUIDs are not NUL-terminated. */ - se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + + se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + op->o_csn.bv_len + 1 ); se->se_next = NULL; se->se_tag = op->o_tag; @@ -1593,8 +1612,9 @@ syncprov_add_slog( Operation *op ) } } sl->sl_num++; + if (!sl->sl_playing) { while ( sl->sl_num > sl->sl_size ) { - int i, j; + int i; se = sl->sl_head; sl->sl_head = se->se_next; for ( i=0; isl_numcsns; i++ ) @@ -1609,6 +1629,7 @@ syncprov_add_slog( Operation *op ) ch_free( se ); sl->sl_num--; } + } ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); } } @@ -1643,6 +1664,8 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, num = sl->sl_num; i = 0; nmods = 0; + sl->sl_playing++; + ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) + num * UUID_LEN, op->o_tmpmemctx ); @@ -1699,6 +1722,8 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN); uuids[j].bv_len = UUID_LEN; } + ldap_pvt_thread_mutex_lock( &sl->sl_mutex ); + sl->sl_playing--; ldap_pvt_thread_mutex_unlock( &sl->sl_mutex ); ndel = i; @@ -1797,6 +1822,55 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl, op->o_tmpfree( uuids, op->o_tmpmemctx ); } +static int +syncprov_new_ctxcsn( opcookie *opc, syncprov_info_t *si, int csn_changed, int numvals, BerVarray vals ) +{ + unsigned i; + int j, sid; + + for ( i=0; isi_numcsns; j++ ) { + if ( sid < si->si_sids[j] ) + break; + if ( sid == si->si_sids[j] ) { + if ( ber_bvcmp( &vals[i], &si->si_ctxcsn[j] ) > 0 ) { + ber_bvreplace( &si->si_ctxcsn[j], &vals[i] ); + csn_changed = 1; + } + break; + } + } + + if ( j == si->si_numcsns || sid != si->si_sids[j] ) { + slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, + j, sid, &vals[i] ); + csn_changed = 1; + } + } + if ( csn_changed ) + si->si_dirty = 0; + ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); + + if ( csn_changed ) { + syncops *ss; + ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); + for ( ss = si->si_ops; ss; ss = ss->s_next ) { + if ( ss->s_op->o_abandon ) + continue; + /* Send the updated csn to all syncrepl consumers, + * including the server from which it originated. + * The syncrepl consumer and syncprov provider on + * the originating server may be configured to store + * their csn values in different entries. + */ + syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); + } + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + } + return csn_changed; +} + static int syncprov_op_response( Operation *op, SlapReply *rs ) { @@ -1860,66 +1934,40 @@ syncprov_op_response( Operation *op, SlapReply *rs ) } /* Don't do any processing for consumer contextCSN updates */ - if ( op->o_dont_replicate ) { - if ( op->o_tag == LDAP_REQ_MODIFY && - op->orm_modlist->sml_op == LDAP_MOD_REPLACE && - op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { + if ( SLAPD_SYNC_IS_SYNCCONN( op->o_connid ) && + op->o_tag == LDAP_REQ_MODIFY && + op->orm_modlist && + op->orm_modlist->sml_op == LDAP_MOD_REPLACE && + op->orm_modlist->sml_desc == slap_schema.si_ad_contextCSN ) { /* Catch contextCSN updates from syncrepl. We have to look at * all the attribute values, as there may be more than one csn * that changed, and only one can be passed in the csn queue. */ - Modifications *mod = op->orm_modlist; - unsigned i; - int j, sid; - - for ( i=0; isml_numvals; i++ ) { - sid = slap_parse_csn_sid( &mod->sml_values[i] ); - for ( j=0; jsi_numcsns; j++ ) { - if ( sid < si->si_sids[j] ) - break; - if ( sid == si->si_sids[j] ) { - if ( ber_bvcmp( &mod->sml_values[i], &si->si_ctxcsn[j] ) > 0 ) { - ber_bvreplace( &si->si_ctxcsn[j], &mod->sml_values[i] ); - csn_changed = 1; - } - break; - } - } - - if ( j == si->si_numcsns || sid != si->si_sids[j] ) { - slap_insert_csn_sids( (struct sync_cookie *)&si->si_ctxcsn, - j, sid, &mod->sml_values[i] ); - csn_changed = 1; - } - } + csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, + op->orm_modlist->sml_numvals, op->orm_modlist->sml_values ); if ( csn_changed ) - si->si_dirty = 0; - ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); - - if ( csn_changed ) { - syncops *ss; - ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); - for ( ss = si->si_ops; ss; ss = ss->s_next ) { - if ( ss->s_op->o_abandon ) - continue; - /* Send the updated csn to all syncrepl consumers, - * including the server from which it originated. - * The syncrepl consumer and syncprov provider on - * the originating server may be configured to store - * their csn values in different entries. - */ - syncprov_qresp( opc, ss, LDAP_SYNC_NEW_COOKIE ); - } - ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); - } - } else { - ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); - } + si->si_numops++; + goto leave; + } + if ( op->o_dont_replicate ) { if ( csn_changed ) si->si_numops++; + ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); goto leave; } + /* If we're adding the context entry, parse all of its contextCSNs */ + if ( op->o_tag == LDAP_REQ_ADD && + dn_match( &op->o_req_ndn, &si->si_contextdn )) { + Attribute *a = attr_find( op->ora_e->e_attrs, slap_schema.si_ad_contextCSN ); + if ( a ) { + csn_changed = syncprov_new_ctxcsn( opc, si, csn_changed, a->a_numvals, a->a_vals ); + if ( csn_changed ) + si->si_numops++; + goto added; + } + } + if ( csn_changed ) si->si_numops++; if ( si->si_chkops || si->si_chktime ) { @@ -1946,6 +1994,7 @@ syncprov_op_response( Operation *op, SlapReply *rs ) si->si_dirty = !csn_changed; ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock ); +added: if ( do_check ) { ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); syncprov_checkpoint( op, on ); @@ -1978,6 +2027,8 @@ syncprov_op_response( Operation *op, SlapReply *rs ) continue; syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE ); } + if ( opc->ssres.s_info ) + free_resinfo( &opc->ssres ); break; } } @@ -2351,14 +2402,6 @@ syncprov_search_response( Operation *op, SlapReply *rs ) int i, sid; sid = slap_parse_csn_sid( &a->a_nvals[0] ); - /* Don't send changed entries back to the originator */ - if ( sid == srs->sr_state.sid && srs->sr_state.numcsns ) { - Debug( LDAP_DEBUG_SYNC, - "Entry %s changed by peer, ignored\n", - rs->sr_entry->e_name.bv_val, 0, 0 ); - return LDAP_SUCCESS; - } - /* If not a persistent search */ if ( !ss->ss_so ) { /* Make sure entry is less than the snapshot'd contextCSN */ @@ -2432,7 +2475,7 @@ syncprov_search_response( Operation *op, SlapReply *rs ) } else { /* It's RefreshAndPersist, transition to Persist phase */ syncprov_sendinfo( op, rs, ( ss->ss_flags & SS_PRESENT ) ? - LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, + LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE, ( ss->ss_flags & SS_CHANGED ) ? &cookie : NULL, 1, NULL, 0 ); if ( !BER_BVISNULL( &cookie )) @@ -2492,28 +2535,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) return rs->sr_err; } - /* snapshot the ctxcsn */ - ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); - numcsns = si->si_numcsns; - if ( numcsns ) { - ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); - sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); - for ( i=0; isi_sids[i]; - } else { - ctxcsn = NULL; - sids = NULL; - } - dirty = si->si_dirty; - ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); - - /* We know nothing - do nothing */ - if ( !numcsns ) { - rs->sr_err = LDAP_SUCCESS; - send_ldap_result( op, rs ); - return rs->sr_err; - } - srs = op->o_controls[slap_cids.sc_LDAPsync]; /* If this is a persistent search, set it up right away */ @@ -2521,7 +2542,7 @@ syncprov_op_search( Operation *op, SlapReply *rs ) syncops so = {0}; fbase_cookie fc; opcookie opc; - slap_callback sc; + slap_callback sc = {0}; fc.fss = &so; fc.fbase = 0; @@ -2544,7 +2565,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } sop = ch_malloc( sizeof( syncops )); *sop = so; - ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_rid = srs->sr_state.rid; sop->s_sid = srs->sr_state.sid; /* set refcount=2 to prevent being freed out from under us @@ -2569,22 +2589,47 @@ syncprov_op_search( Operation *op, SlapReply *rs ) ldap_pvt_thread_yield(); ldap_pvt_thread_mutex_lock( &si->si_ops_mutex ); } + if ( op->o_abandon ) { + ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); + ch_free( sop ); + return SLAPD_ABANDON; + } + ldap_pvt_thread_mutex_init( &sop->s_mutex ); sop->s_next = si->si_ops; sop->s_si = si; si->si_ops = sop; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); } + /* snapshot the ctxcsn + * Note: this must not be done before the psearch setup. (ITS#8365) + */ + ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock ); + numcsns = si->si_numcsns; + if ( numcsns ) { + ber_bvarray_dup_x( &ctxcsn, si->si_ctxcsn, op->o_tmpmemctx ); + sids = op->o_tmpalloc( numcsns * sizeof(int), op->o_tmpmemctx ); + for ( i=0; isi_sids[i]; + } else { + ctxcsn = NULL; + sids = NULL; + } + dirty = si->si_dirty; + ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock ); + /* If we have a cookie, handle the PRESENT lookups */ if ( srs->sr_state.ctxcsn ) { sessionlog *sl; int i, j; - /* If we don't have any CSN of our own yet, pretend nothing - * has changed. + /* If we don't have any CSN of our own yet, bail out. */ - if ( !numcsns ) - goto no_change; + if ( !numcsns ) { + rs->sr_err = LDAP_UNWILLING_TO_PERFORM; + rs->sr_text = "consumer has state info but provider doesn't!"; + goto bailout; + } if ( !si->si_nopres ) do_present = SS_PRESENT; @@ -2650,6 +2695,10 @@ syncprov_op_search( Operation *op, SlapReply *rs ) /* our state is older, complain to consumer */ rs->sr_err = LDAP_UNWILLING_TO_PERFORM; rs->sr_text = "consumer state is newer than provider!"; + Log4( LDAP_DEBUG_SYNC, ldap_syslog_level, + "consumer %d state %s is newer than provider %d state %s\n", + sids[i], srs->sr_state.ctxcsn[i].bv_val, sids[j], /* == slap_serverID */ + ctxcsn[j].bv_val); bailout: if ( sop ) { syncops **sp = &si->si_ops; @@ -2665,7 +2714,7 @@ bailout: send_ldap_result( op, rs ); return rs->sr_err; } - } + } if ( BER_BVISEMPTY( &mincsn )) { mincsn = maxcsn; minsid = maxsid; @@ -2674,7 +2723,7 @@ bailout: /* If nothing has changed, shortcut it */ if ( !changed && !dirty ) { do_present = 0; -no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { +no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { LDAPControl *ctrls[2]; ctrls[0] = NULL; @@ -2761,6 +2810,9 @@ no_change: if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) { } } } else { + /* The consumer knows nothing, we know nothing. OK. */ + if (!numcsns) + goto no_change; /* No consumer state, assume something has changed */ changed = SS_CHANGED; } @@ -2987,22 +3039,13 @@ sp_cf_gen(ConfigArgs *c) si->si_chktime = 0; break; case SP_SESSL: - if ( si->si_logs ) - si->si_logs->sl_size = 0; - else - rc = LDAP_NO_SUCH_ATTRIBUTE; + si->si_logs->sl_size = 0; break; case SP_NOPRES: - if ( si->si_nopres ) - si->si_nopres = 0; - else - rc = LDAP_NO_SUCH_ATTRIBUTE; + si->si_nopres = 0; break; case SP_USEHINT: - if ( si->si_usehint ) - si->si_usehint = 0; - else - rc = LDAP_NO_SUCH_ATTRIBUTE; + si->si_usehint = 0; break; } return rc; @@ -3208,8 +3251,8 @@ syncprov_db_close( ConfigReply *cr ) { - slap_overinst *on = (slap_overinst *) be->bd_info; - syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + slap_overinst *on = (slap_overinst *) be->bd_info; + syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; #ifdef SLAP_CONFIG_DELETE syncops *so, *sonext; #endif /* SLAP_CONFIG_DELETE */ @@ -3240,7 +3283,10 @@ syncprov_db_close( rs.sr_err = LDAP_UNAVAILABLE; send_ldap_result( so->s_op, &rs ); sonext=so->s_next; - syncprov_drop_psearch( so, 0); + if ( so->s_flags & PS_TASK_QUEUED ) + ldap_pvt_thread_pool_retract( so->s_pool_cookie ); + if ( !syncprov_drop_psearch( so, 0 )) + so->s_si = NULL; } si->si_ops=NULL; ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex ); @@ -3248,7 +3294,7 @@ syncprov_db_close( overlay_unregister_control( be, LDAP_CONTROL_SYNC ); #endif /* SLAP_CONFIG_DELETE */ - return 0; + return 0; } static int