From 08564beb6bd816a4a6e1397898d9fc82baa4e138 Mon Sep 17 00:00:00 2001 From: Howard Chu Date: Sat, 27 Nov 2004 07:05:24 +0000 Subject: [PATCH] Serialize multiple modifies of the same entry (ala seqmod.c) --- servers/slapd/overlays/syncprov.c | 102 +++++++++++++++++++++++++++++- 1 file changed, 99 insertions(+), 3 deletions(-) diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index 8b35d5ee34..6563590976 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -25,6 +25,21 @@ #include "lutil.h" #include "slap.h" +/* A modify request on a particular entry */ +typedef struct modinst { + struct modinst *mi_next; + Operation *mi_op; +} modinst; + +typedef struct modtarget { + struct modinst *mt_mods; + struct modinst *mt_tail; + Operation *mt_op; + ldap_pvt_thread_mutex_t mt_mutex; +} modtarget; + + + /* A queued result of a persistent search */ typedef struct syncres { struct syncres *s_next; @@ -81,8 +96,10 @@ typedef struct syncprov_info_t { syncops *si_ops; struct berval si_ctxcsn; /* ldapsync context */ int si_gotcsn; /* is our ctxcsn up to date? */ + Avlnode *si_mods; /* entries being modified */ ldap_pvt_thread_mutex_t si_csn_mutex; ldap_pvt_thread_mutex_t si_ops_mutex; + ldap_pvt_thread_mutex_t si_mods_mutex; char si_ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE]; } syncprov_info_t; @@ -335,6 +352,20 @@ syncprov_sendinfo( return LDAP_SUCCESS; } + +static int +sp_avl_cmp( const void *c1, const void *c2 ) +{ + const modtarget *m1, *m2; + int rc; + + m1 = c1; m2 = c2; + rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len; + + if ( rc ) return rc; + return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn ); +} + /* syncprov_findbase: * finds the true DN of the base of a search (with alias dereferencing) and * checks to make sure the base entry doesn't get replaced with a different @@ -870,13 +901,38 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) { slap_callback *cb = op->o_callback; opcookie *opc = cb->sc_private; + slap_overinst *on = opc->son; + syncprov_info_t *si = on->on_bi.bi_private; syncmatches *sm, *snext; + modtarget *mt, mtdummy; for (sm = opc->smatches; sm; sm=snext) { snext = sm->sm_next; syncprov_free_syncop( sm->sm_op ); op->o_tmpfree( sm, op->o_tmpmemctx ); } + + /* Remove op from lock table */ + mtdummy.mt_op = op; + ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); + mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); + if ( mt ) { + modinst *mi = mt->mt_mods; + + /* If there are more, promote the next one */ + ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); + if ( mi->mi_next ) { + mt->mt_mods = mi->mi_next; + mt->mt_op = mt->mt_mods->mi_op; + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + } else { + avl_delete( &si->si_mods, mt, sp_avl_cmp ); + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + ldap_pvt_thread_mutex_destroy( &mt->mt_mutex ); + ch_free( mt ); + } + } + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); op->o_callback = cb->sc_next; op->o_tmpfree(cb, op->o_tmpmemctx); } @@ -1039,7 +1095,10 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) slap_overinst *on = (slap_overinst *)op->o_bd->bd_info; syncprov_info_t *si = on->on_bi.bi_private; - slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(opcookie), op->o_tmpmemctx); + slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+ + sizeof(opcookie) + + (si->si_ops ? sizeof(modinst) : 0 ), + op->o_tmpmemctx); opcookie *opc = (opcookie *)(cb+1); opc->son = on; cb->sc_response = syncprov_op_response; @@ -1048,8 +1107,45 @@ syncprov_op_mod( Operation *op, SlapReply *rs ) cb->sc_next = op->o_callback; op->o_callback = cb; - if ( si->si_ops && op->o_tag != LDAP_REQ_ADD ) - syncprov_matchops( op, opc, 1 ); + /* If there are active persistent searches, lock this operation. + * See seqmod.c for the locking logic on its own. + */ + if ( si->si_ops ) { + modtarget *mt, mtdummy; + modinst *mi; + + mi = (modinst *)(opc+1); + mi->mi_op = op; + + /* See if we're already modifying this entry... */ + mtdummy.mt_op = op; + ldap_pvt_thread_mutex_lock( &si->si_mods_mutex ); + mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp ); + if ( mt ) { + ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); + mt->mt_tail->mi_next = mi; + mt->mt_tail = mi; + /* wait for this op to get to head of list */ + while ( mt->mt_mods != mi ) { + ldap_pvt_thread_mutex_unlock( &mt->mt_mutex ); + ldap_pvt_thread_yield(); + ldap_pvt_thread_mutex_lock( &mt->mt_mutex ); + } + } else { + /* Record that we're modifying this entry now */ + mt = malloc( sizeof(modtarget) ); + mt->mt_mods = mi; + mt->mt_tail = mi; + mt->mt_op = mi->mi_op; + ldap_pvt_thread_mutex_init( &mt->mt_mutex ); + avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error ); + ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex ); + } + + if ( op->o_tag != LDAP_REQ_ADD ) + syncprov_matchops( op, opc, 1 ); + } return SLAP_CB_CONTINUE; } -- 2.39.5