]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/overlays/syncprov.c
ITS#5428
[openldap] / servers / slapd / overlays / syncprov.c
index ad8b9bb329e0b507e54f8dbaf4833156c4058aeb..9d7a9ad72154672bf4a44414170eb5de9a5fc551 100644 (file)
@@ -696,7 +696,7 @@ again:
                break;
        }
 
-       fop.o_bd->bd_info = on->on_info->oi_orig;
+       fop.o_bd->bd_info = (BackendInfo *)on->on_info;
        fop.o_bd->be_search( &fop, &frs );
        fop.o_bd->bd_info = (BackendInfo *)on;
 
@@ -837,8 +837,10 @@ syncprov_sendresp( Operation *op, opcookie *opc, syncops *so,
 
 /* Play back queued responses */
 static int
-syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
+syncprov_qplay( Operation *op, struct re_s *rtask )
 {
+       syncops *so = rtask->arg;
+       slap_overinst *on = LDAP_SLIST_FIRST(&so->s_op->o_extra)->oe_key;
        syncres *sr;
        Entry *e;
        opcookie opc;
@@ -853,10 +855,10 @@ syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
                        so->s_res = sr->s_next;
                if ( !so->s_res )
                        so->s_restail = NULL;
-               ldap_pvt_thread_mutex_unlock( &so->s_mutex );
-
+               /* Exit loop with mutex held */
                if ( !sr || so->s_op->o_abandon )
                        break;
+               ldap_pvt_thread_mutex_unlock( &so->s_mutex );
 
                opc.sdn = sr->s_dn;
                opc.sndn = sr->s_ndn;
@@ -883,9 +885,24 @@ syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
 
                ch_free( sr );
 
-               if ( rc )
+               if ( rc ) {
+                       /* Exit loop with mutex held */
+                       ldap_pvt_thread_mutex_lock( &so->s_mutex );
                        break;
+               }
        }
+
+       /* wait until we get explicitly scheduled again */
+       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+       ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
+       if ( rc == 0 ) {
+               ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
+       } else {
+               /* bail out on any error */
+               ldap_pvt_runqueue_remove( &slapd_rq, rtask );
+       }
+       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+       ldap_pvt_thread_mutex_unlock( &so->s_mutex );
        return rc;
 }
 
@@ -895,7 +912,6 @@ syncprov_qtask( void *ctx, void *arg )
 {
        struct re_s *rtask = arg;
        syncops *so = rtask->arg;
-       slap_overinst *on = so->s_op->o_private;
        OperationBuffer opbuf;
        Operation *op;
        BackendDB be;
@@ -917,25 +933,14 @@ syncprov_qtask( void *ctx, void *arg )
        be = *so->s_op->o_bd;
        be.be_flags |= SLAP_DBFLAG_OVERLAY;
        op->o_bd = &be;
-       op->o_private = NULL;
+       LDAP_SLIST_FIRST(&op->o_extra) = NULL;
        op->o_callback = NULL;
 
-       rc = syncprov_qplay( op, on, so );
+       rc = syncprov_qplay( op, rtask );
 
        /* decrement use count... */
        syncprov_free_syncop( so );
 
-       /* wait until we get explicitly scheduled again */
-       ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
-       ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
-       if ( rc == 0 ) {
-               ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
-       } else {
-               /* bail out on any error */
-               ldap_pvt_runqueue_remove( &slapd_rq, rtask );
-       }
-       ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
-
 #if 0  /* FIXME: connection_close isn't exported from slapd.
                 * should it be?
                 */
@@ -1209,7 +1214,7 @@ syncprov_matchops( Operation *op, opcookie *opc, int saveit )
                }
 
                /* check if current o_req_dn is in scope and matches filter */
-               if ( fc.fscope && test_filter( op, e, ss->s_op->ors_filter ) ==
+               if ( fc.fscope && test_filter( ss->s_op, e, ss->s_op->ors_filter ) ==
                        LDAP_COMPARE_TRUE ) {
                        if ( saveit ) {
                                sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
@@ -1301,6 +1306,7 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        Operation opm;
        SlapReply rsm = { 0 };
        slap_callback cb = {0};
+       BackendDB be;
 
        mod.sml_numvals = si->si_numcsns;
        mod.sml_values = si->si_ctxcsn;
@@ -1316,8 +1322,12 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        opm.o_callback = &cb;
        opm.orm_modlist = &mod;
        opm.orm_no_opattrs = 1;
-       opm.o_req_dn = op->o_bd->be_suffix[0];
-       opm.o_req_ndn = op->o_bd->be_nsuffix[0];
+       if ( SLAP_GLUE_SUBORDINATE( op->o_bd )) {
+               be = *on->on_info->oi_origdb;
+               opm.o_bd = &be;
+       }
+       opm.o_req_dn = opm.o_bd->be_suffix[0];
+       opm.o_req_ndn = opm.o_bd->be_nsuffix[0];
        opm.o_bd->bd_info = on->on_info->oi_orig;
        opm.o_managedsait = SLAP_CONTROL_NONCRITICAL;
        opm.o_no_schema_check = 1;
@@ -1325,7 +1335,6 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        if ( mod.sml_next != NULL ) {
                slap_mods_free( mod.sml_next, 1 );
        }
-       opm.orm_no_opattrs = 0;
 }
 
 static void
@@ -1519,7 +1528,7 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
                fop.ors_filter = ⁡
 
                cb.sc_response = playlog_cb;
-               fop.o_bd->bd_info = on->on_info->oi_orig;
+               fop.o_bd->bd_info = (BackendInfo *)on->on_info;
 
                for ( i=ndel; i<num; i++ ) {
                        if ( uuids[i].bv_len == 0 ) continue;
@@ -1540,13 +1549,16 @@ syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
        if ( ndel ) {
                struct berval cookie;
 
-               slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid,
-                       srs->sr_state.sid );
+               if ( delcsn[0].bv_len ) {
+                       slap_compose_sync_cookie( op, &cookie, delcsn, srs->sr_state.rid,
+                               srs->sr_state.sid );
+               }
 
                Debug( LDAP_DEBUG_SYNC, "syncprov_playlog: cookie=%s\n", cookie.bv_val, 0, 0 );
 
                uuids[ndel].bv_val = NULL;
-               syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuids, 1 );
+               syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET,
+                       delcsn[0].bv_len ? &cookie : NULL, 0, uuids, 1 );
                op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
        }
        op->o_tmpfree( uuids, op->o_tmpmemctx );
@@ -1775,7 +1787,13 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
                        /* wait for this op to get to head of list */
                        while ( mt->mt_mods != mi ) {
                                ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
-                               ldap_pvt_thread_yield();
+                               /* FIXME: if dynamic config can delete overlays or
+                                * databases we'll have to check for cleanup here.
+                                * Currently it's not an issue because there are
+                                * no dynamic config deletes...
+                                */
+                               if ( !ldap_pvt_thread_pool_pausecheck( &connection_pool ))
+                                       ldap_pvt_thread_yield();
                                ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
 
                                /* clean up if the caller is giving up */
@@ -1843,6 +1861,7 @@ syncprov_search_cleanup( Operation *op, SlapReply *rs )
 typedef struct SyncOperationBuffer {
        Operation               sob_op;
        Opheader                sob_hdr;
+       OpExtra                 sob_oe;
        AttributeName   sob_extra;      /* not always present */
        /* Further data allocated here */
 } SyncOperationBuffer;
@@ -1871,6 +1890,7 @@ syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
        sopbuf2 = ch_calloc( 1, size );
        op2 = &sopbuf2->sob_op;
        op2->o_hdr = &sopbuf2->sob_hdr;
+       LDAP_SLIST_FIRST(&op2->o_extra) = &sopbuf2->sob_oe;
 
        /* Copy the fields we care about explicitly, leave the rest alone */
        *op2->o_hdr = *op->o_hdr;
@@ -1878,7 +1898,8 @@ syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
        op2->o_time = op->o_time;
        op2->o_bd = on->on_info->oi_origdb;
        op2->o_request = op->o_request;
-       op2->o_private = on;
+       LDAP_SLIST_FIRST(&op2->o_extra)->oe_key = on;
+       LDAP_SLIST_NEXT(LDAP_SLIST_FIRST(&op2->o_extra), oe_next) = NULL;
 
        ptr = (char *) sopbuf2 + offsetof( SyncOperationBuffer, sob_extra );
        if ( i ) {
@@ -1929,12 +1950,10 @@ syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
        op2->o_do_not_cache = 1;
 
        /* Add op2 to conn so abandon will find us */
-       ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
        op->o_conn->c_n_ops_executing++;
        op->o_conn->c_n_ops_completed--;
        LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
        so->s_flags |= PS_IS_DETACHED;
-       ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
 
        /* Prevent anyone else from trying to send a result for this op */
        op->o_abandon = 1;
@@ -2044,15 +2063,27 @@ syncprov_search_response( Operation *op, SlapReply *rs )
 
                        /* Detach this Op from frontend control */
                        ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
+                       ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
 
-                       /* Turn off the refreshing flag */
-                       ss->ss_so->s_flags ^= PS_IS_REFRESHING;
+                       /* But not if this connection was closed along the way */
+                       if ( op->o_abandon ) {
+                               ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
+                               ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
+                               syncprov_free_syncop( ss->ss_so );
+                               return SLAPD_ABANDON;
 
-                       syncprov_detach_op( op, ss->ss_so, on );
+                       } else {
+                               /* Turn off the refreshing flag */
+                               ss->ss_so->s_flags ^= PS_IS_REFRESHING;
 
-                       /* If there are queued responses, fire them off */
-                       if ( ss->ss_so->s_res )
-                               syncprov_qstart( ss->ss_so );
+                               syncprov_detach_op( op, ss->ss_so, on );
+
+                               ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
+
+                               /* If there are queued responses, fire them off */
+                               if ( ss->ss_so->s_res )
+                                       syncprov_qstart( ss->ss_so );
+                       }
                        ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
 
                        return LDAP_SUCCESS;
@@ -2212,6 +2243,9 @@ no_change:                if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                }
                                goto shortcut;
                        }
+               } else {
+                       /* consumer doesn't have the right number of CSNs */
+                       changed = SS_CHANGED;
                }
                /* Do we have a sessionlog for this search? */
                sl=si->si_logs;
@@ -2354,7 +2388,7 @@ syncprov_operational(
                                }
 
                                if ( !ap ) {
-                                       if ( !rs->sr_flags & REP_ENTRY_MODIFIABLE ) {
+                                       if ( !(rs->sr_flags & REP_ENTRY_MODIFIABLE) ) {
                                                rs->sr_entry = entry_dup( rs->sr_entry );
                                                rs->sr_flags |=
                                                        REP_ENTRY_MODIFIABLE|REP_ENTRY_MUSTBEFREED;