+/* Play back queued responses */
+static int
+syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
+{
+ syncres *sr;
+ Entry *e;
+ opcookie opc;
+ int rc = 0;
+
+ opc.son = on;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
+
+ for (;;) {
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ sr = so->s_res;
+ if ( sr )
+ so->s_res = sr->s_next;
+ if ( !so->s_res )
+ so->s_restail = NULL;
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+
+ if ( !sr || so->s_op->o_abandon )
+ break;
+
+ opc.sdn = sr->s_dn;
+ opc.sndn = sr->s_ndn;
+ opc.suuid = sr->s_uuid;
+ opc.sctxcsn = sr->s_csn;
+ opc.sreference = sr->s_isreference;
+ e = NULL;
+
+ if ( sr->s_mode != LDAP_SYNC_DELETE ) {
+ rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+ if ( rc ) {
+ ch_free( sr );
+ continue;
+ }
+ }
+ rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
+
+ if ( e ) {
+ be_entry_release_rw( op, e, 0 );
+ }
+
+ ch_free( sr );
+
+ if ( rc )
+ break;
+ }
+ op->o_bd->bd_info = (BackendInfo *)on;
+ return rc;
+}
+
+/* runqueue task for playing back queued responses */
+static void *
+syncprov_qtask( void *ctx, void *arg )
+{
+ struct re_s *rtask = arg;
+ syncops *so = rtask->arg;
+ slap_overinst *on = so->s_op->o_private;
+ OperationBuffer opbuf;
+ Operation *op;
+ BackendDB be;
+
+ op = (Operation *) &opbuf;
+ *op = *so->s_op;
+ op->o_hdr = (Opheader *)(op+1);
+ op->o_controls = (void **)(op->o_hdr+1);
+ memset( op->o_controls, 0, SLAP_MAX_CIDS * sizeof(void *));
+
+ *op->o_hdr = *so->s_op->o_hdr;
+
+ op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx);
+ op->o_tmpmfuncs = &slap_sl_mfuncs;
+ op->o_threadctx = ctx;
+
+ /* syncprov_qplay expects a fake db */
+ be = *so->s_op->o_bd;
+ be.be_flags |= SLAP_DBFLAG_OVERLAY;
+ op->o_bd = &be;
+ op->o_private = NULL;
+ op->o_callback = NULL;
+
+ (void)syncprov_qplay( op, on, so );
+
+ /* decrement use count... */
+ syncprov_free_syncop( so );
+
+ /* wait until we get explicitly scheduled again */
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+ return NULL;
+}
+
+/* Start the task to play back queued psearch responses */
+static void
+syncprov_qstart( syncops *so )
+{
+ int wake=0;
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ if ( !so->s_qtask ) {
+ so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
+ syncprov_qtask, so, "syncprov_qtask",
+ so->s_op->o_conn->c_peer_name.bv_val );
+ ++so->s_inuse;
+ wake = 1;
+ } else {
+ if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
+ !so->s_qtask->next_sched.tv_sec ) {
+ so->s_qtask->interval.tv_sec = 0;
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+ so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
+ ++so->s_inuse;
+ wake = 1;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+ if ( wake )
+ slap_wake_listener();
+}
+
+/* Queue a persistent search response */
+static int
+syncprov_qresp( opcookie *opc, syncops *so, int mode )
+{
+ syncres *sr;
+
+ sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
+ opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
+ sr->s_next = NULL;
+ sr->s_dn.bv_val = (char *)(sr + 1);
+ sr->s_dn.bv_len = opc->sdn.bv_len;
+ sr->s_mode = mode;
+ sr->s_isreference = opc->sreference;
+ sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val,
+ opc->sdn.bv_val ) + 1;
+ sr->s_ndn.bv_len = opc->sndn.bv_len;
+ sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val,
+ opc->sndn.bv_val ) + 1;
+ sr->s_uuid.bv_len = opc->suuid.bv_len;
+ AC_MEMCPY( sr->s_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
+ sr->s_csn.bv_val = sr->s_uuid.bv_val + sr->s_uuid.bv_len + 1;
+ sr->s_csn.bv_len = opc->sctxcsn.bv_len;
+ strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
+
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ if ( !so->s_res ) {
+ so->s_res = sr;
+ } else {
+ so->s_restail->s_next = sr;
+ }
+ so->s_restail = sr;
+
+ /* If the base of the psearch was modified, check it next time round */
+ if ( so->s_flags & PS_WROTE_BASE ) {
+ so->s_flags ^= PS_WROTE_BASE;
+ so->s_flags |= PS_FIND_BASE;
+ }
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ syncprov_qstart( so );
+ }
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ return LDAP_SUCCESS;
+}
+
+static int
+syncprov_drop_psearch( syncops *so, int lock )
+{
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ if ( lock )
+ ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
+ so->s_op->o_conn->c_n_ops_executing--;
+ so->s_op->o_conn->c_n_ops_completed++;
+ LDAP_STAILQ_REMOVE( &so->s_op->o_conn->c_ops, so->s_op, slap_op,
+ o_next );
+ if ( lock )
+ ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
+ }
+ syncprov_free_syncop( so );
+
+ return 0;
+}
+
+static int
+syncprov_ab_cleanup( Operation *op, SlapReply *rs )
+{
+ slap_callback *sc = op->o_callback;
+ op->o_callback = sc->sc_next;
+ syncprov_drop_psearch( op->o_callback->sc_private, 0 );
+ op->o_tmpfree( sc, op->o_tmpmemctx );
+ return 0;
+}
+
+static int
+syncprov_op_abandon( Operation *op, SlapReply *rs )
+{
+ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ syncprov_info_t *si = on->on_bi.bi_private;
+ syncops *so, *soprev;
+
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so;
+ soprev=so, so=so->s_next ) {
+ if ( so->s_op->o_connid == op->o_connid &&
+ so->s_op->o_msgid == op->orn_msgid ) {
+ so->s_op->o_abandon = 1;
+ soprev->s_next = so->s_next;
+ break;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+ if ( so ) {
+ /* Is this really a Cancel exop? */
+ if ( op->o_tag != LDAP_REQ_ABANDON ) {
+ so->s_op->o_cancel = SLAP_CANCEL_ACK;
+ rs->sr_err = LDAP_CANCELLED;
+ send_ldap_result( so->s_op, rs );
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ slap_callback *cb;
+ cb = op->o_tmpcalloc( 1, sizeof(slap_callback), op->o_tmpmemctx );
+ cb->sc_cleanup = syncprov_ab_cleanup;
+ cb->sc_next = op->o_callback;
+ cb->sc_private = so;
+ return SLAP_CB_CONTINUE;
+ }
+ }
+ syncprov_drop_psearch( so, 0 );
+ }
+ return SLAP_CB_CONTINUE;
+}
+
+/* Find which persistent searches are affected by this operation */