]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/syncprov.c
ITS#6710
[openldap] / servers / slapd / overlays / syncprov.c
index 60db65e1b8ddeb06d3be7bd40228b8810b336169..7fd374f016aa7b9eecf25400710dd39479c92611 100644 (file)
@@ -133,6 +133,9 @@ typedef struct syncprov_info_t {
        int             si_numops;      /* number of ops since last checkpoint */
        int             si_nopres;      /* Skip present phase */
        int             si_usehint;     /* use reload hint */
+       int             si_active;      /* True if there are active mods */
+       int             si_dirty;       /* True if the context is dirty, i.e changes
+                                                * have been made without updating the csn. */
        time_t  si_chklast;     /* time of last checkpoint */
        Avlnode *si_mods;       /* entries being modified */
        sessionlog      *si_logs;
@@ -1293,6 +1296,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                }
 
                if ( fc.fscope ) {
+                       ldap_pvt_thread_mutex_lock( &ss->s_mutex );
                        op2 = *ss->s_op;
                        oh = *op->o_hdr;
                        oh.oh_conn = ss->s_op->o_conn;
@@ -1301,7 +1305,6 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                        op2.o_hdr = &oh;
                        op2.o_extra = op->o_extra;
                        op2.o_callback = NULL;
-                       ldap_pvt_thread_mutex_lock( &ss->s_mutex );
                        if (ss->s_flags & PS_FIX_FILTER) {
                                /* Skip the AND/GE clause that we stuck on in front. We
                                   would lose deletes/mods that happen during the refresh
@@ -1376,6 +1379,11 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
        syncmatches *sm, *snext;
        modtarget *mt, mtdummy;
 
+       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+       if ( si->si_active )
+               si->si_active--;
+       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+
        for (sm = opc->smatches; sm; sm=snext) {
                snext = sm->sm_next;
                syncprov_free_syncop( sm->sm_op );
@@ -1806,6 +1814,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                        csn_changed = 1;
                                }
                        }
+                       if ( csn_changed )
+                               si->si_dirty = 0;
                        ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                        if ( csn_changed ) {
@@ -1849,6 +1859,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                }
                        }
                }
+               si->si_dirty = !csn_changed;
                ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                if ( do_check ) {
@@ -1976,6 +1987,7 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
 
        ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
        have_psearches = ( si->si_ops != NULL );
+       si->si_active++;
        ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
 
        cbsize = sizeof(slap_callback) + sizeof(opcookie) +
@@ -2366,6 +2378,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
        BerVarray ctxcsn;
        int i, *sids, numcsns;
        struct berval mincsn;
+       int dirty = 0;
 
        if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
 
@@ -2410,6 +2423,20 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                sop->s_inuse = 1;
 
                ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               while ( si->si_active ) {
+                       /* Wait for active mods to finish before proceeding, as they
+                        * may already have inspected the si_ops list looking for
+                        * consumers to replicate the change to.  Using the log
+                        * doesn't help, as we may finish playing it before the
+                        * active mods gets added to it.
+                        */
+                       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+                       if ( slapd_shutdown )
+                               return SLAPD_ABANDON;
+                       if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
+                               ldap_pvt_thread_yield();
+                       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               }
                sop->s_next = si->si_ops;
                si->si_ops = sop;
                ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
@@ -2427,6 +2454,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                ctxcsn = NULL;
                sids = NULL;
        }
+       dirty = si->si_dirty;
        ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
        
        /* If we have a cookie, handle the PRESENT lookups */
@@ -2506,7 +2534,7 @@ bailout:
                                if ( changed )
                                        break;
                        }
-                       if ( !changed ) {
+                       if ( !changed && !dirty ) {
                                do_present = 0;
 no_change:             if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                        LDAPControl     *ctrls[2];
@@ -2590,7 +2618,7 @@ shortcut:
        }
 
        /* If something changed, find the changes */
-       if ( gotstate && changed ) {
+       if ( gotstate && ( changed || dirty ) ) {
                Filter *fand, *fava;
 
                fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
@@ -2606,10 +2634,14 @@ shortcut:
 #endif
                ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
                fava->f_next = op->ors_filter;
+               if ( sop )
+                       ldap_pvt_thread_mutex_lock( &sop->s_mutex );
                op->ors_filter = fand;
                filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
-               if ( sop )
+               if ( sop ) {
                        sop->s_flags |= PS_FIX_FILTER;
+                       ldap_pvt_thread_mutex_unlock( &sop->s_mutex );
+               }
        }
 
        /* Let our callback add needed info to returned entries */
@@ -2631,7 +2663,7 @@ shortcut:
         * the refresh phase, just invoke the response callback to transition
         * us into persist phase
         */
-       if ( !changed ) {
+       if ( !changed && !dirty ) {
                rs->sr_err = LDAP_SUCCESS;
                rs->sr_nentries = 0;
                send_ldap_result( op, rs );