]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/syncprov.c
Serialize multiple modifies of the same entry (ala seqmod.c)
[openldap] / servers / slapd / overlays / syncprov.c
index 8b35d5ee347a3e90596d4edb9cf30ff02a6d594c..65635909761372442f57443679bf8f8e492b0903 100644 (file)
 #include "lutil.h"
 #include "slap.h"
 
+/* A modify request on a particular entry */
+typedef struct modinst {
+       struct modinst *mi_next;
+       Operation *mi_op;
+} modinst;
+
+typedef struct modtarget {
+       struct modinst *mt_mods;
+       struct modinst *mt_tail;
+       Operation *mt_op;
+       ldap_pvt_thread_mutex_t mt_mutex;
+} modtarget;
+
+       
+
 /* A queued result of a persistent search */
 typedef struct syncres {
        struct syncres *s_next;
@@ -81,8 +96,10 @@ typedef struct syncprov_info_t {
        syncops         *si_ops;
        struct berval   si_ctxcsn;      /* ldapsync context */
        int             si_gotcsn;      /* is our ctxcsn up to date? */
+       Avlnode *si_mods;       /* entries being modified */
        ldap_pvt_thread_mutex_t si_csn_mutex;
        ldap_pvt_thread_mutex_t si_ops_mutex;
+       ldap_pvt_thread_mutex_t si_mods_mutex;
        char            si_ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
 } syncprov_info_t;
 
@@ -335,6 +352,20 @@ syncprov_sendinfo(
 
        return LDAP_SUCCESS;
 }
+
+static int
+sp_avl_cmp( const void *c1, const void *c2 )
+{
+       const modtarget *m1, *m2;
+       int rc;
+
+       m1 = c1; m2 = c2;
+       rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len;
+
+       if ( rc ) return rc;
+       return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn );
+}
+
 /* syncprov_findbase:
  *   finds the true DN of the base of a search (with alias dereferencing) and
  * checks to make sure the base entry doesn't get replaced with a different
@@ -870,13 +901,38 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
 {
        slap_callback *cb = op->o_callback;
        opcookie *opc = cb->sc_private;
+       slap_overinst *on = opc->son;
+       syncprov_info_t         *si = on->on_bi.bi_private;
        syncmatches *sm, *snext;
+       modtarget *mt, mtdummy;
 
        for (sm = opc->smatches; sm; sm=snext) {
                snext = sm->sm_next;
                syncprov_free_syncop( sm->sm_op );
                op->o_tmpfree( sm, op->o_tmpmemctx );
        }
+
+       /* Remove op from lock table */
+       mtdummy.mt_op = op;
+       ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
+       mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
+       if ( mt ) {
+               modinst *mi = mt->mt_mods;
+               
+               /* If there are more, promote the next one */
+               ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+               if ( mi->mi_next ) {
+                       mt->mt_mods = mi->mi_next;
+                       mt->mt_op = mt->mt_mods->mi_op;
+                       ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+               } else {
+                       avl_delete( &si->si_mods, mt, sp_avl_cmp );
+                       ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+                       ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
+                       ch_free( mt );
+               }
+       }
+       ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
        op->o_callback = cb->sc_next;
        op->o_tmpfree(cb, op->o_tmpmemctx);
 }
@@ -1039,7 +1095,10 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
        syncprov_info_t         *si = on->on_bi.bi_private;
 
-       slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(opcookie), op->o_tmpmemctx);
+       slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+
+               sizeof(opcookie) +
+               (si->si_ops ? sizeof(modinst) : 0 ),
+               op->o_tmpmemctx);
        opcookie *opc = (opcookie *)(cb+1);
        opc->son = on;
        cb->sc_response = syncprov_op_response;
@@ -1048,8 +1107,45 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
        cb->sc_next = op->o_callback;
        op->o_callback = cb;
 
-       if ( si->si_ops && op->o_tag != LDAP_REQ_ADD )
-               syncprov_matchops( op, opc, 1 );
+       /* If there are active persistent searches, lock this operation.
+        * See seqmod.c for the locking logic on its own.
+        */
+       if ( si->si_ops ) {
+               modtarget *mt, mtdummy;
+               modinst *mi;
+
+               mi = (modinst *)(opc+1);
+               mi->mi_op = op;
+
+               /* See if we're already modifying this entry... */
+               mtdummy.mt_op = op;
+               ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
+               mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
+               if ( mt ) {
+                       ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+                       ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
+                       mt->mt_tail->mi_next = mi;
+                       mt->mt_tail = mi;
+                       /* wait for this op to get to head of list */
+                       while ( mt->mt_mods != mi ) {
+                               ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+                               ldap_pvt_thread_yield();
+                               ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+                       }
+               } else {
+                       /* Record that we're modifying this entry now */
+                       mt = malloc( sizeof(modtarget) );
+                       mt->mt_mods = mi;
+                       mt->mt_tail = mi;
+                       mt->mt_op = mi->mi_op;
+                       ldap_pvt_thread_mutex_init( &mt->mt_mutex );
+                       avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
+                       ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
+               }
+
+               if ( op->o_tag != LDAP_REQ_ADD )
+                       syncprov_matchops( op, opc, 1 );
+       }
 
        return SLAP_CB_CONTINUE;
 }