#define PS_IS_DETACHED 0x02
#define PS_WROTE_BASE 0x04
#define PS_FIND_BASE 0x08
+#define PS_FIX_FILTER 0x10
int s_inuse; /* reference count */
struct syncres *s_res;
return NULL;
}
+/* Start the task to play back queued psearch responses */
+static void
+syncprov_qstart( syncops *so )
+{
+ int wake=0;
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ if ( !so->s_qtask ) {
+ so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
+ syncprov_qtask, so, "syncprov_qtask",
+ so->s_op->o_conn->c_peer_name.bv_val );
+ ++so->s_inuse;
+ wake = 1;
+ } else {
+ if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
+ !so->s_qtask->next_sched.tv_sec ) {
+ so->s_qtask->interval.tv_sec = 0;
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+ so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
+ ++so->s_inuse;
+ wake = 1;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+ if ( wake )
+ slap_wake_listener();
+}
+
/* Queue a persistent search response */
static int
syncprov_qresp( opcookie *opc, syncops *so, int mode )
so->s_flags |= PS_FIND_BASE;
}
if ( so->s_flags & PS_IS_DETACHED ) {
- int wake=0;
- ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- if ( !so->s_qtask ) {
- so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
- syncprov_qtask, so, "syncprov_qtask",
- so->s_op->o_conn->c_peer_name.bv_val );
- ++so->s_inuse;
- wake = 1;
- } else {
- if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
- !so->s_qtask->next_sched.tv_sec ) {
- so->s_qtask->interval.tv_sec = 0;
- ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
- so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
- ++so->s_inuse;
- wake = 1;
- }
- }
- ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
- if ( wake )
- slap_wake_listener();
+ syncprov_qstart( so );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return LDAP_SUCCESS;
/* Update our context CSN */
cbuf[0] = '\0';
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- slap_get_commit_csn( op, &maxcsn, NULL );
+ slap_get_commit_csn( op, &maxcsn );
if ( !BER_BVISNULL( &maxcsn ) ) {
strcpy( cbuf, maxcsn.bv_val );
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) {
op2->ors_filterstr.bv_val = ptr;
strcpy( ptr, so->s_filterstr.bv_val );
op2->ors_filterstr.bv_len = so->s_filterstr.bv_len;
- op2->ors_filter = filter_dup( op->ors_filter, NULL );
+
+ /* Skip the AND/GE clause that we stuck on in front */
+ if ( so->s_flags & PS_FIX_FILTER ) {
+ op2->ors_filter = op->ors_filter->f_and->f_next;
+ so->s_flags ^= PS_FIX_FILTER;
+ } else {
+ op2->ors_filter = op->ors_filter;
+ }
+ op2->ors_filter = filter_dup( op2->ors_filter, NULL );
so->s_op = op2;
/* Copy any cached group ACLs individually */
op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
} else {
/* It's RefreshAndPersist, transition to Persist phase */
- syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
+ syncprov_sendinfo( op, rs, ss->ss_present ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
ss->ss_so->s_flags ^= PS_IS_REFRESHING;
syncprov_detach_op( op, ss->ss_so, on );
+
+ /* If there are queued responses, fire them off */
+ if ( ss->ss_so->s_res )
+ syncprov_qstart( ss->ss_so );
ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
return LDAP_SUCCESS;
sl=si->si_logs;
if ( sl ) {
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
- if ( ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
+ /* Are there any log entries, and is the consumer state
+ * present in the session log?
+ */
+ if ( sl->sl_num > 0 && ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
do_present = 0;
/* mutex is unlocked in playlog */
syncprov_playlog( op, rs, sl, srs, &ctxcsn );
fava->f_next = op->ors_filter;
op->ors_filter = fand;
filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
+ if ( sop )
+ sop->s_flags |= PS_FIX_FILTER;
}
/* Let our callback add needed info to returned entries */