]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/syncprov.c
ITS#6710
[openldap] / servers / slapd / overlays / syncprov.c
index 4249af7168556bd6c8fe1beffca8e580aa8d585e..7fd374f016aa7b9eecf25400710dd39479c92611 100644 (file)
@@ -2,7 +2,7 @@
 /* syncprov.c - syncrepl provider */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *
- * Copyright 2004-2009 The OpenLDAP Foundation.
+ * Copyright 2004-2010 The OpenLDAP Foundation.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -133,6 +133,9 @@ typedef struct syncprov_info_t {
        int             si_numops;      /* number of ops since last checkpoint */
        int             si_nopres;      /* Skip present phase */
        int             si_usehint;     /* use reload hint */
+       int             si_active;      /* True if there are active mods */
+       int             si_dirty;       /* True if the context is dirty, i.e changes
+                                                * have been made without updating the csn. */
        time_t  si_chklast;     /* time of last checkpoint */
        Avlnode *si_mods;       /* entries being modified */
        sessionlog      *si_logs;
@@ -591,8 +594,8 @@ syncprov_findcsn( Operation *op, find_csn_t mode )
        slap_callback cb = {0};
        Operation fop;
        SlapReply frs = { REP_RESULT };
-       char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
-       char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+       char buf[LDAP_PVT_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
+       char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
        struct berval maxcsn;
        Filter cf;
        AttributeAssertion eq = ATTRIBUTEASSERTION_INIT;
@@ -853,9 +856,14 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
        }
 
        rs.sr_ctrls = ctrls;
+       rs.sr_entry = &e_uuid;
+       if ( mode == LDAP_SYNC_ADD || mode == LDAP_SYNC_MODIFY ) {
+               e_uuid = *opc->se;
+               e_uuid.e_private = NULL;
+       }
+
        switch( mode ) {
        case LDAP_SYNC_ADD:
-               rs.sr_entry = opc->se;
                if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
                        rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
                        rs.sr_err = send_search_reference( op, &rs );
@@ -864,7 +872,6 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
                }
                /* fallthru */
        case LDAP_SYNC_MODIFY:
-               rs.sr_entry = opc->se;
                rs.sr_attrs = op->ors_attrs;
                rs.sr_err = send_search_entry( op, &rs );
                break;
@@ -872,7 +879,6 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, int mode )
                e_uuid.e_attrs = NULL;
                e_uuid.e_name = opc->sdn;
                e_uuid.e_nname = opc->sndn;
-               rs.sr_entry = &e_uuid;
                if ( opc->sreference && so->s_op->o_managedsait <= SLAP_CONTROL_IGNORED ) {
                        struct berval bv = BER_BVNULL;
                        rs.sr_ref = &bv;
@@ -928,9 +934,9 @@ syncprov_qplay( Operation *op, syncops *so )
                ldap_pvt_thread_mutex_unlock( &so->s_mutex );
 
                if ( sr->s_mode == LDAP_SYNC_NEW_COOKIE ) {
-                   SlapReply rs = { REP_INTERMEDIATE };
+                       SlapReply rs = { REP_INTERMEDIATE };
 
-                   rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
+                       rc = syncprov_sendinfo( op, &rs, LDAP_TAG_SYNC_NEW_COOKIE,
                                &sr->s_csn, 0, NULL, 0 );
                } else {
                        opc.sdn = sr->s_dn;
@@ -942,11 +948,11 @@ syncprov_qplay( Operation *op, syncops *so )
 
                        rc = syncprov_sendresp( op, &opc, so, sr->s_mode );
 
-                       if ( opc.se ) {
-                               if ( !dec_mutexint( opc.se->e_private )) {
-                                       opc.se->e_private = NULL;
-                                       entry_free ( opc.se );
-                               }
+               }
+               if ( sr->s_e ) {
+                       if ( !dec_mutexint( sr->s_e->e_private )) {
+                               sr->s_e->e_private = NULL;
+                               entry_free ( sr->s_e );
                        }
                }
 
@@ -981,7 +987,6 @@ syncprov_qtask( void *ctx, void *arg )
        Operation *op;
        BackendDB be;
        int rc;
-       OpExtra oex;
 
        op = &opbuf.ob_op;
        *op = *so->s_op;
@@ -1000,11 +1005,6 @@ syncprov_qtask( void *ctx, void *arg )
        be.be_flags |= SLAP_DBFLAG_OVERLAY;
        op->o_bd = &be;
        LDAP_SLIST_FIRST(&op->o_extra) = NULL;
-
-       /* Let syncprov_operational know it's us */
-       oex.oe_key = (void *)syncprov_qtask;
-       LDAP_SLIST_INSERT_HEAD(&op->o_extra, &oex, oe_next);
-
        op->o_callback = NULL;
 
        rc = syncprov_qplay( op, so );
@@ -1296,6 +1296,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                }
 
                if ( fc.fscope ) {
+                       ldap_pvt_thread_mutex_lock( &ss->s_mutex );
                        op2 = *ss->s_op;
                        oh = *op->o_hdr;
                        oh.oh_conn = ss->s_op->o_conn;
@@ -1304,7 +1305,14 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                        op2.o_hdr = &oh;
                        op2.o_extra = op->o_extra;
                        op2.o_callback = NULL;
-                       rc = test_filter( &op2, e, ss->s_op->ors_filter );
+                       if (ss->s_flags & PS_FIX_FILTER) {
+                               /* Skip the AND/GE clause that we stuck on in front. We
+                                  would lose deletes/mods that happen during the refresh
+                                  phase otherwise (ITS#6555) */
+                               op2.ors_filter = ss->s_op->ors_filter->f_and->f_next;
+                       }
+                       ldap_pvt_thread_mutex_unlock( &ss->s_mutex );
+                       rc = test_filter( &op2, e, op2.ors_filter );
                }
 
                Debug( LDAP_DEBUG_TRACE, "syncprov_matchops: sid %03x fscope %d rc %d\n",
@@ -1371,6 +1379,11 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
        syncmatches *sm, *snext;
        modtarget *mt, mtdummy;
 
+       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+       if ( si->si_active )
+               si->si_active--;
+       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+
        for (sm = opc->smatches; sm; sm=snext) {
                snext = sm->sm_next;
                syncprov_free_syncop( sm->sm_op );
@@ -1416,6 +1429,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        SlapReply rsm = { 0 };
        slap_callback cb = {0};
        BackendDB be;
+       BackendInfo *bi;
 
 #ifdef CHECK_CSN
        Syntax *syn = slap_schema.si_ad_contextCSN->ad_type->sat_syntax;
@@ -1445,6 +1459,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        }
        opm.o_req_dn = si->si_contextdn;
        opm.o_req_ndn = si->si_contextdn;
+       bi = opm.o_bd->bd_info;
        opm.o_bd->bd_info = on->on_info->oi_orig;
        opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
        opm.o_no_schema_check = 1;
@@ -1462,6 +1477,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
                if ( e == opm.ora_e )
                        be_entry_release_w( &opm, opm.ora_e );
        }
+       opm.o_bd->bd_info = bi;
 
        if ( mod.sml_next != NULL ) {
                slap_mods_free( mod.sml_next, 1 );
@@ -1538,7 +1554,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
        slog_entry *se;
        int i, j, ndel, num, nmods, mmods;
-       char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+       char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
        BerVarray uuids;
        struct berval delcsn[2];
 
@@ -1713,7 +1729,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
        if ( rs->sr_err == LDAP_SUCCESS )
        {
                struct berval maxcsn;
-               char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+               char cbuf[LDAP_PVT_CSNSTR_BUFSIZE];
                int do_check = 0, have_psearches, foundit, csn_changed = 0;
 
                ldap_pvt_thread_mutex_lock( &si->si_resp_mutex );
@@ -1798,6 +1814,8 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                        csn_changed = 1;
                                }
                        }
+                       if ( csn_changed )
+                               si->si_dirty = 0;
                        ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                        if ( csn_changed ) {
@@ -1841,6 +1859,7 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                }
                        }
                }
+               si->si_dirty = !csn_changed;
                ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
 
                if ( do_check ) {
@@ -1968,6 +1987,7 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
 
        ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
        have_psearches = ( si->si_ops != NULL );
+       si->si_active++;
        ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
 
        cbsize = sizeof(slap_callback) + sizeof(opcookie) +
@@ -2358,6 +2378,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
        BerVarray ctxcsn;
        int i, *sids, numcsns;
        struct berval mincsn;
+       int dirty = 0;
 
        if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
 
@@ -2402,6 +2423,20 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                sop->s_inuse = 1;
 
                ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               while ( si->si_active ) {
+                       /* Wait for active mods to finish before proceeding, as they
+                        * may already have inspected the si_ops list looking for
+                        * consumers to replicate the change to.  Using the log
+                        * doesn't help, as we may finish playing it before the
+                        * active mods gets added to it.
+                        */
+                       ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+                       if ( slapd_shutdown )
+                               return SLAPD_ABANDON;
+                       if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
+                               ldap_pvt_thread_yield();
+                       ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+               }
                sop->s_next = si->si_ops;
                si->si_ops = sop;
                ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
@@ -2419,6 +2454,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                ctxcsn = NULL;
                sids = NULL;
        }
+       dirty = si->si_dirty;
        ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
        
        /* If we have a cookie, handle the PRESENT lookups */
@@ -2475,7 +2511,10 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                                        if ( newer < 0 )
                                                changed = SS_CHANGED;
                                        else if ( newer > 0 ) {
-                                       /* our state is older, tell consumer nothing */
+                                       /* our state is older, complain to consumer */
+                                               rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
+                                               rs->sr_text = "consumer state is newer than provider!";
+bailout:
                                                if ( sop ) {
                                                        syncops **sp = &si->si_ops;
                                                        
@@ -2486,7 +2525,6 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                                                        ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
                                                        ch_free( sop );
                                                }
-                                               rs->sr_err = LDAP_SUCCESS;
                                                rs->sr_ctrls = NULL;
                                                send_ldap_result( op, rs );
                                                return rs->sr_err;
@@ -2496,7 +2534,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                                if ( changed )
                                        break;
                        }
-                       if ( !changed ) {
+                       if ( !changed && !dirty ) {
                                do_present = 0;
 no_change:             if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                        LDAPControl     *ctrls[2];
@@ -2541,8 +2579,9 @@ no_change:                if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                        ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
                                if ( sids )
                                        op->o_tmpfree( sids, op->o_tmpmemctx );
-                               send_ldap_error( op, rs, LDAP_SYNC_REFRESH_REQUIRED, "sync cookie is stale" );
-                               return rs->sr_err;
+                               rs->sr_err = LDAP_SYNC_REFRESH_REQUIRED;
+                               rs->sr_text = "sync cookie is stale";
+                               goto bailout;
                        }
                        if ( srs->sr_state.ctxcsn ) {
                                ber_bvarray_free_x( srs->sr_state.ctxcsn, op->o_tmpmemctx );
@@ -2562,8 +2601,7 @@ no_change:                if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                        ber_bvarray_free_x( ctxcsn, op->o_tmpmemctx );
                                if ( sids )
                                        op->o_tmpfree( sids, op->o_tmpmemctx );
-                               send_ldap_result( op, rs );
-                               return rs->sr_err;
+                               goto bailout;
                        }
                }
        } else {
@@ -2580,7 +2618,7 @@ shortcut:
        }
 
        /* If something changed, find the changes */
-       if ( gotstate && changed ) {
+       if ( gotstate && ( changed || dirty ) ) {
                Filter *fand, *fava;
 
                fand = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
@@ -2596,10 +2634,14 @@ shortcut:
 #endif
                ber_dupbv_x( &fava->f_ava->aa_value, &mincsn, op->o_tmpmemctx );
                fava->f_next = op->ors_filter;
+               if ( sop )
+                       ldap_pvt_thread_mutex_lock( &sop->s_mutex );
                op->ors_filter = fand;
                filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
-               if ( sop )
+               if ( sop ) {
                        sop->s_flags |= PS_FIX_FILTER;
+                       ldap_pvt_thread_mutex_unlock( &sop->s_mutex );
+               }
        }
 
        /* Let our callback add needed info to returned entries */
@@ -2621,7 +2663,7 @@ shortcut:
         * the refresh phase, just invoke the response callback to transition
         * us into persist phase
         */
-       if ( !changed ) {
+       if ( !changed && !dirty ) {
                rs->sr_err = LDAP_SUCCESS;
                rs->sr_nentries = 0;
                send_ldap_result( op, rs );
@@ -2637,13 +2679,6 @@ syncprov_operational(
 {
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
        syncprov_info_t         *si = (syncprov_info_t *)on->on_bi.bi_private;
-       OpExtra         *oex;
-
-       /* short-circuit, don't want backends handling this */
-       LDAP_SLIST_FOREACH(oex, &op->o_extra, oe_next) {
-               if ( oex->oe_key == (void *)syncprov_qtask )
-                       return LDAP_SUCCESS;
-       }
 
        /* This prevents generating unnecessarily; frontend will strip
         * any statically stored copy.
@@ -2869,7 +2904,7 @@ sp_cf_gen(ConfigArgs *c)
                }
                sl = si->si_logs;
                if ( !sl ) {
-                       sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
+                       sl = ch_malloc( sizeof( sessionlog ) + LDAP_PVT_CSNSTR_BUFSIZE );
                        sl->sl_mincsn.bv_val = (char *)(sl+1);
                        sl->sl_mincsn.bv_len = 0;
                        sl->sl_num = 0;
@@ -2975,7 +3010,7 @@ syncprov_db_open(
 
        /* Didn't find a contextCSN, should we generate one? */
        if ( !si->si_ctxcsn ) {
-               char csnbuf[ LDAP_LUTIL_CSNSTR_BUFSIZE ];
+               char csnbuf[ LDAP_PVT_CSNSTR_BUFSIZE ];
                struct berval csn;
 
                if ( SLAP_SYNC_SHADOW( op->o_bd )) {