From: Howard Chu Date: Mon, 20 Jun 2011 03:03:01 +0000 (-0700) Subject: delta-mmr conflict resolution X-Git-Url: https://git.sur5r.net/?p=openldap;a=commitdiff_plain;h=67bbad6e704840fecdacdc45ce536719dd5173f2 delta-mmr conflict resolution --- diff --git a/servers/slapd/syncrepl.c b/servers/slapd/syncrepl.c index c40b36d8b8..e69959fd16 100644 --- a/servers/slapd/syncrepl.c +++ b/servers/slapd/syncrepl.c @@ -154,6 +154,36 @@ static int null_callback( Operation *, SlapReply * ); static AttributeDescription *sync_descs[4]; +/* delta-mmr */ +static AttributeDescription *ad_reqMod, *ad_reqDN; + +typedef struct logschema { + struct berval ls_dn; + struct berval ls_req; + struct berval ls_mod; + struct berval ls_newRdn; + struct berval ls_delRdn; + struct berval ls_newSup; +} logschema; + +static logschema changelog_sc = { + BER_BVC("targetDN"), + BER_BVC("changeType"), + BER_BVC("changes"), + BER_BVC("newRDN"), + BER_BVC("deleteOldRDN"), + BER_BVC("newSuperior") +}; + +static logschema accesslog_sc = { + BER_BVC("reqDN"), + BER_BVC("reqType"), + BER_BVC("reqMod"), + BER_BVC("reqNewRDN"), + BER_BVC("reqDeleteOldRDN"), + BER_BVC("reqNewSuperior") +}; + static const char * syncrepl_state2str( int state ) { @@ -194,6 +224,13 @@ init_syncrepl(syncinfo_t *si) if ( si->si_syncdata && !overlay_is_inst( si->si_be, syncrepl_ov.on_bi.bi_type )) { overlay_config( si->si_be, syncrepl_ov.on_bi.bi_type, -1, NULL, NULL ); + if ( !ad_reqMod ) { + const char *text; + logschema *ls = &accesslog_sc; + + slap_bv2ad( &ls->ls_mod, &ad_reqMod, &text ); + slap_bv2ad( &ls->ls_dn, &ad_reqDN, &text ); + } } if ( !sync_descs[0] ) { @@ -349,33 +386,6 @@ init_syncrepl(syncinfo_t *si) si->si_exattrs = exattrs; } -typedef struct logschema { - struct berval ls_dn; - struct berval ls_req; - struct berval ls_mod; - struct berval ls_newRdn; - struct berval ls_delRdn; - struct berval ls_newSup; -} logschema; - -static logschema changelog_sc = { - BER_BVC("targetDN"), - BER_BVC("changeType"), - BER_BVC("changes"), - BER_BVC("newRDN"), - BER_BVC("deleteOldRDN"), - BER_BVC("newSuperior") -}; - -static logschema accesslog_sc = { - BER_BVC("reqDN"), - BER_BVC("reqType"), - BER_BVC("reqMod"), - BER_BVC("reqNewRDN"), - BER_BVC("reqDeleteOldRDN"), - BER_BVC("reqNewSuperior") -}; - static int ldap_sync_search( syncinfo_t *si, @@ -1769,6 +1779,193 @@ typedef struct OpExtraSync { syncinfo_t *oe_si; } OpExtraSync; +/* Copy the original modlist, split Replace ops into Delete/Add, + * and drop mod opattrs since this modification is in the past. + */ +static Modifications *mods_dup( Operation *op, Modifications *modlist ) +{ + Modifications *mod, *modnew = NULL, *modtail = NULL; + int size; + for ( ; modlist; modlist = modlist->sml_next ) { + if ( modlist->sml_desc == slap_schema.si_ad_modifiersName || + modlist->sml_desc == slap_schema.si_ad_modifyTimestamp || + modlist->sml_desc == slap_schema.si_ad_entryCSN ) + continue; + if ( modlist->sml_op == LDAP_MOD_REPLACE ) { + mod = op->o_tmpalloc( sizeof(Modifications), op->o_tmpmemctx ); + mod->sml_desc = modlist->sml_desc; + mod->sml_values = NULL; + mod->sml_nvalues = NULL; + mod->sml_op = LDAP_MOD_DELETE; + mod->sml_numvals = 0; + mod->sml_flags = 0; + if ( !modnew ) + modnew = mod; + if ( modtail ) + modtail->sml_next = mod; + modtail = mod; + } + if ( modlist->sml_numvals ) { + size = (modlist->sml_numvals+1) * sizeof(struct berval); + if ( modlist->sml_nvalues ) size *= 2; + } else { + size = 0; + } + size += sizeof(Modifications); + mod = op->o_tmpalloc( size, op->o_tmpmemctx ); + if ( !modnew ) + modnew = mod; + if ( modtail ) + modtail->sml_next = mod; + modtail = mod; + mod->sml_desc = modlist->sml_desc; + mod->sml_numvals = modlist->sml_numvals; + mod->sml_flags = 0; + if ( modlist->sml_numvals ) { + int i; + mod->sml_values = (BerVarray)(mod+1); + for (i=0; isml_numvals; i++) + mod->sml_values[i] = modlist->sml_values[i]; + if ( modlist->sml_nvalues ) { + mod->sml_nvalues = mod->sml_values + mod->sml_numvals + 1; + for (i=0; isml_numvals; i++) + mod->sml_nvalues[i] = modlist->sml_nvalues[i]; + } + } + if ( modlist->sml_op == LDAP_MOD_REPLACE ) + mod->sml_op = LDAP_MOD_ADD; + else + mod->sml_op = modlist->sml_op; + mod->sml_next = NULL; + } + return modnew; +} + +typedef struct resolve_ctxt { + syncinfo_t *rx_si; + Modifications *rx_mods; +} resolve_ctxt; + +static void +compare_vals( Modifications *m1, Modifications *m2 ) +{ + int i, j; + struct berval *bv1, *bv2; + + if ( m2->sml_nvalues ) { + bv2 = m2->sml_nvalues; + bv1 = m1->sml_nvalues; + } else { + bv2 = m2->sml_values; + bv1 = m1->sml_values; + } + for ( j=0; jsml_numvals; j++ ) { + for ( i=0; isml_numvals; i++ ) { + if ( !ber_bvcmp( &bv1[i], &bv2[j] )) { + int k; + for ( k=i; ksml_numvals-1; k++ ) { + m1->sml_values[k] = m1->sml_values[k+1]; + if ( m1->sml_nvalues ) + m1->sml_nvalues[k] = m1->sml_nvalues[k+1]; + } + m1->sml_numvals--; + i--; + } + } + } +} + +static int +syncrepl_resolve_cb( Operation *op, SlapReply *rs ) +{ + if ( rs->sr_type == REP_SEARCH ) { + resolve_ctxt *rx = op->o_callback->sc_private; + Attribute *a = attr_find( rs->sr_entry->e_attrs, ad_reqMod ); + if ( a ) { + Modifications *oldmods, *newmods, *m1, *m2, **prev; + oldmods = rx->rx_mods; + syncrepl_accesslog_mods( rx->rx_si, a->a_vals, &newmods ); + for ( m2 = newmods; m2; m2=m2->sml_next ) { + for ( prev = &oldmods, m1 = *prev; m1; m1 = *prev ) { + if ( m1->sml_desc != m2->sml_desc ) { + prev = &m1->sml_next; + continue; + } + if ( m2->sml_op == LDAP_MOD_DELETE || + m2->sml_op == LDAP_MOD_REPLACE ) { + int numvals = m2->sml_numvals; + if ( m2->sml_op == LDAP_MOD_REPLACE ) + numvals = 0; + /* New delete All cancels everything */ + if ( numvals == 0 ) { +drop: + *prev = m1->sml_next; + op->o_tmpfree( m1, op->o_tmpmemctx ); + continue; + } + if ( m1->sml_op == LDAP_MOD_DELETE ) { + if ( m1->sml_numvals == 0 ) { + /* turn this to SOFTDEL later */ + m1->sml_flags = SLAP_MOD_INTERNAL; + } else { + compare_vals( m1, m2 ); + if ( !m1->sml_numvals ) + goto drop; + } + } else if ( m1->sml_op == LDAP_MOD_ADD ) { + compare_vals( m1, m2 ); + if ( !m1->sml_numvals ) + goto drop; + } + } + + if ( m2->sml_op == LDAP_MOD_ADD || + m2->sml_op == LDAP_MOD_REPLACE ) { + if ( m1->sml_op == LDAP_MOD_DELETE ) { + if ( !m1->sml_numvals ) goto drop; + compare_vals( m1, m2 ); + if ( !m1->sml_numvals ) + goto drop; + } + if ( m2->sml_desc->ad_type->sat_atype.at_single_value ) + goto drop; + compare_vals( m1, m2 ); + if ( !m1->sml_numvals ) + goto drop; + } + prev = &m1->sml_next; + } + } + slap_mods_free( newmods, 1 ); + } + } + return LDAP_SUCCESS; +} + +typedef struct modify_ctxt { + Modifications *mx_orig; + Modifications *mx_free; +} modify_ctxt; + +static int +syncrepl_modify_cb( Operation *op, SlapReply *rs ) +{ + slap_callback *sc = op->o_callback; + modify_ctxt *mx = sc->sc_private; + Modifications *ml; + + op->o_callback = sc->sc_next; + op->o_tmpfree( sc, op->o_tmpmemctx ); + + op->orm_no_opattrs = 0; + op->orm_modlist = mx->mx_orig; + for ( ml = mx->mx_free; ml; ml = mx->mx_free ) { + mx->mx_free = ml->sml_next; + op->o_tmpfree( ml, op->o_tmpmemctx ); + } + return SLAP_CB_CONTINUE; +} + static int syncrepl_op_modify( Operation *op, SlapReply *rs ) { @@ -1777,7 +1974,7 @@ syncrepl_op_modify( Operation *op, SlapReply *rs ) syncinfo_t *si; Entry *e; int rc, match = 0; - Modifications *mod; + Modifications *mod, *newlist; LDAP_SLIST_FOREACH( oex, &op->o_extra, oe_next ) { if ( oex->oe_key == (void *)syncrepl_message_to_op ) @@ -1827,6 +2024,7 @@ syncrepl_op_modify( Operation *op, SlapReply *rs ) * old new * delete all delete all drop * delete all delete X SOFTDEL + * delete X delete all drop * delete X delete X drop * delete X delete Y OK * delete all add X drop @@ -1841,6 +2039,74 @@ syncrepl_op_modify( Operation *op, SlapReply *rs ) * that accesslog logs the original mod. */ + newlist = mods_dup( op, op->orm_modlist ); + + { + Operation op2 = *op; + AttributeName an[2]; + const char *text; + slap_callback cb; + struct berval bv; + char *ptr; + int size, rc; + SlapReply rs1 = {0}; + resolve_ctxt rx = { si, newlist }; + + op2.o_tag = LDAP_REQ_SEARCH; + op2.ors_scope = LDAP_SCOPE_SUBTREE; + op2.ors_deref = LDAP_DEREF_NEVER; + op2.o_req_dn = si->si_logbase; + op2.o_req_ndn = si->si_logbase; + op2.ors_tlimit = SLAP_NO_LIMIT; + op2.ors_slimit = SLAP_NO_LIMIT; + op2.ors_limit = NULL; + memset( an, 0, sizeof(an)); + an[0].an_desc = ad_reqMod; + an[0].an_name = ad_reqMod->ad_cname; + op2.ors_attrs = an; + op2.ors_attrsonly = 0; + + bv = mod->sml_nvalues[0]; + ptr = strchr(bv.bv_val, '.'); + bv.bv_len = ptr - bv.bv_val; + + size = sizeof("(&(reqStart>=)(reqDN=))"); + size += bv.bv_len + op->o_req_ndn.bv_len + si->si_logfilterstr.bv_len; + op2.ors_filterstr.bv_val = op->o_tmpalloc( size, op->o_tmpmemctx ); + op2.ors_filterstr.bv_len = sprintf(op2.ors_filterstr.bv_val, + "(&(reqStart>=%.*s)(reqDN=%s)%s)", + (int)bv.bv_len, bv.bv_val, op->o_req_ndn.bv_val, si->si_logfilterstr.bv_val ); + op2.ors_filter = str2filter_x( op, op2.ors_filterstr.bv_val ); + + op2.o_callback = &cb; + cb.sc_response = syncrepl_resolve_cb; + cb.sc_private = ℞ + op2.o_bd = select_backend( &op2.o_req_ndn, 1 ); + op2.o_bd->be_search( &op2, &rs1 ); + newlist = rx.rx_mods; + } + + { + slap_callback *sc = op->o_tmpalloc( sizeof(slap_callback) + + sizeof(modify_ctxt), op->o_tmpmemctx ); + modify_ctxt *mx = (modify_ctxt *)(sc+1); + Modifications *ml; + + sc->sc_response = syncrepl_modify_cb; + sc->sc_private = mx; + sc->sc_next = op->o_callback; + op->o_callback = sc; + op->orm_no_opattrs = 1; + mx->mx_orig = op->orm_modlist; + mx->mx_free = newlist; + for ( ml = newlist; ml; ml=ml->sml_next ) { + if ( ml->sml_flags == SLAP_MOD_INTERNAL ) { + ml->sml_flags = 0; + ml->sml_op = SLAP_MOD_SOFTDEL; + } + } + op->orm_modlist = newlist; + } return SLAP_CB_CONTINUE; }