]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/syncprov.c
ITS#6710
[openldap] / servers / slapd / overlays / syncprov.c
index d3584a22384fcae33f8d79a84c622b826fb43313..7fd374f016aa7b9eecf25400710dd39479c92611 100644 (file)
@@ -133,6 +133,9 @@ typedef struct syncprov_info_t {
        int             si_numops;      /* number of ops since last checkpoint */
        int             si_nopres;      /* Skip present phase */
        int             si_usehint;     /* use reload hint */
+       int             si_active;      /* True if there are active mods */
+       int             si_dirty;       /* True if the context is dirty, i.e changes
+                                                * have been made without updating the csn. */
        time_t  si_chklast;     /* time of last checkpoint */
        Avlnode *si_mods;       /* entries being modified */
        sessionlog      *si_logs;
@@ -591,8 +594,8 @@ syncprov_findcsn( Operation *op, find_csn_t mode )
        slap_callback cb = {0};
        Operation fop;
        SlapReply frs = { REP_RESULT };
-       char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
-       char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+       char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
+       char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
        struct berval maxcsn;
        Filter cf;
        AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
@@ -931,9 +934,9 @@ syncprov_qplay( Operation *op, syncops *so )
                ldap_pvt_thread_mutex_unlock( &so->s_mutex );
 
                if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) {
-                   SlapReply rs = { REP_INTERMEDIATE };
+                       SlapReply rs = { REP_INTERMEDIATE };
 
-                   rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
+                       rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
                                &sr->s_csn, 0, NULL, 0 );
                } else {
                        opc.sdn = sr->s_dn;
@@ -945,11 +948,11 @@ syncprov_qplay( Operation *op, syncops *so )
 
                        rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
 
-                       if ( opc.se ) {
-                               if ( !dec_mutexint( opc.se->e_private )) {
-                                       opc.se->e_private = NULL;
-                                       entry_free ( opc.se );
-                               }
+               }
+               if ( sr->s_e ) {
+                       if ( !dec_mutexint( sr->s_e->e_private )) {
+                               sr->s_e->e_private = NULL;
+                               entry_free ( sr->s_e );
                        }
                }
 
@@ -1293,6 +1296,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                }
 
                if ( fc.fscope ) {
+                       ldap_pvt_thread_mutex_lock( &ss->s_mutex );
                        op2 = *ss->s_op;
                        oh = *op->o_hdr;
                        oh.oh_conn = ss->s_op->o_conn;
@@ -1301,7 +1305,14 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                        op2.o_hdr = &oh;
                        op2.o_extra = op->o_extra;
                        op2.o_callback = NULL;
-                       rc = test_filter( &op2, e, ss->s_op->ors_filter );
+                       if (ss->s_flags & PS_FIX_FILTER) {
+                               /* Skip the AND/GE clause that we stuck on in front. We
+                                  would lose deletes/mods that happen during the refresh
+                                  phase otherwise (ITS#6555) */
+                               op2.ors_filter = ss->s_op->ors_filter->f_and->f_next;
+                       }
+                       ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
+                       rc = test_filter( &op2, e, op2.ors_filter );
                }
 
                Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n",
@@ -1368,6 +1379,11 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
        syncmatches *sm, *snext;
        modtarget *mt, mtdummy;
 
+       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+       if ( si->si_active )
+               si->si_active--;
+       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+
        for (sm = opc->smatches; sm; sm=snext) {
                snext = sm->sm_next;
                syncprov_free_syncop( sm->sm_op );
@@ -1413,6 +1429,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        SlapReply rsm = { 0 };
        slap_callback cb = {0};
        BackendDB be;
+       BackendInfo *bi;
 
 #ifdef CHECK_CSN
        Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
@@ -1442,6 +1459,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        }
        opm.o_req_dn = si->si_contextdn;
        opm.o_req_ndn = si->si_contextdn;
+       bi = opm.o_bd->bd_info;
        opm.o_bd->bd_info = on->on_info->oi_orig;
        opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
        opm.o_no_schema_check = 1;
@@ -1459,6 +1477,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
                if ( e == opm.ora_e )
                        be_entry_release_w( &opm, opm.ora_e );
        }
+       opm.o_bd->bd_info = bi;
 
        if ( mod.sml_next != NULL ) {
                slap_mods_free( mod.sml_next, 1 );
@@ -1535,7 +1554,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
        slog_entry *se;
        int i, j, ndel, num, nmods, mmods;
-       char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+       char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
        BerVarray uuids;
        struct berval delcsn[2];
 
@@ -1710,7 +1729,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
        if ( rs->sr_err == LDAP_SUCCESS )
        {
                struct berval maxcsn;
-               char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+               char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
                int do_check = 0, have_psearches, foundit, csn_changed = 0;
 
                ldap_pvt_thread_mutex_lock( &si->si_resp_mutex );
@@ -1795,6 +1814,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                        csn_changed = 1;
                                }
                        }
+                       if ( csn_changed )
+                               si->si_dirty = 0;
                        ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                        if ( csn_changed ) {
@@ -1838,6 +1859,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                }
                        }
                }
+               si->si_dirty = !csn_changed;
                ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                if ( do_check ) {
@@ -1965,6 +1987,7 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
 
        ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
        have_psearches = ( si->si_ops != NULL );
+       si->si_active++;
        ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
 
        cbsize = sizeof(slap_callback) + sizeof(opcookie) +
@@ -2355,6 +2378,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
        BerVarray ctxcsn;
        int i, *sids, numcsns;
        struct berval mincsn;
+       int dirty = 0;
 
        if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
 
@@ -2399,6 +2423,20 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                sop->s_inuse = 1;
 
                ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               while ( si->si_active ) {
+                       /* Wait for active mods to finish before proceeding, as they
+                        * may already have inspected the si_ops list looking for
+                        * consumers to replicate the change to.  Using the log
+                        * doesn't help, as we may finish playing it before the
+                        * active mods gets added to it.
+                        */
+                       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+                       if ( slapd_shutdown )
+                               return SLAPD_ABANDON;
+                       if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
+                               ldap_pvt_thread_yield();
+                       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               }
                sop->s_next = si->si_ops;
                si->si_ops = sop;
                ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
@@ -2416,6 +2454,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                ctxcsn = NULL;
                sids = NULL;
        }
+       dirty = si->si_dirty;
        ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
        
        /* If we have a cookie, handle the PRESENT lookups */
@@ -2472,8 +2511,9 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                                        if ( newer < 0 )
                                                changed = SS_CHANGED;
                                        else if ( newer > 0 ) {
-                                       /* our state is older, tell consumer nothing */
-                                               rs->sr_err = LDAP_SUCCESS;
+                                       /* our state is older, complain to consumer */
+                                               rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
+                                               rs->sr_text = "consumer state is newer than provider!";
 bailout:
                                                if ( sop ) {
                                                        syncops **sp = &si->si_ops;
@@ -2494,7 +2534,7 @@ bailout:
                                if ( changed )
                                        break;
                        }
-                       if ( !changed ) {
+                       if ( !changed && !dirty ) {
                                do_present = 0;
 no_change:             if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                        LDAPControl     *ctrls[2];
@@ -2578,7 +2618,7 @@ shortcut:
        }
 
        /* If something changed, find the changes */
-       if ( gotstate && changed ) {
+       if ( gotstate && ( changed || dirty ) ) {
                Filter *fand, *fava;
 
                fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
@@ -2594,10 +2634,14 @@ shortcut:
 #endif
                ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
                fava->f_next = op->ors_filter;
+               if ( sop )
+                       ldap_pvt_thread_mutex_lock( &sop->s_mutex );
                op->ors_filter = fand;
                filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
-               if ( sop )
+               if ( sop ) {
                        sop->s_flags |= PS_FIX_FILTER;
+                       ldap_pvt_thread_mutex_unlock( &sop->s_mutex );
+               }
        }
 
        /* Let our callback add needed info to returned entries */
@@ -2619,7 +2663,7 @@ shortcut:
         * the refresh phase, just invoke the response callback to transition
         * us into persist phase
         */
-       if ( !changed ) {
+       if ( !changed && !dirty ) {
                rs->sr_err = LDAP_SUCCESS;
                rs->sr_nentries = 0;
                send_ldap_result( op, rs );
@@ -2860,7 +2904,7 @@ sp_cf_gen(ConfigArgs *c)
                }
                sl = si->si_logs;
                if ( !sl ) {
-                       sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
+                       sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE );
                        sl->sl_mincsn.bv_val = (char *)(sl+1);
                        sl->sl_mincsn.bv_len = 0;
                        sl->sl_num = 0;
@@ -2966,7 +3010,7 @@ syncprov_db_open(
 
        /* Didn't find a contextCSN, should we generate one? */
        if ( !si->si_ctxcsn ) {
-               char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ];
+               char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ];
                struct berval csn;
 
                if ( SLAP_SYNC_SHADOW( op->o_bd )) {