#include "lutil.h"
#include "slap.h"
#include "config.h"
+#include "ldap_rq.h"
/* A modify request on a particular entry */
typedef struct modinst {
int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
+ void *s_qtask; /* task for playing psearch responses */
ldap_pvt_thread_mutex_t s_mutex;
} syncops;
return rc;
}
-/* Queue a persistent search response */
-static int
-syncprov_qresp( opcookie *opc, syncops *so, int mode )
-{
- syncres *sr;
-
- sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
- opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
- sr->s_next = NULL;
- sr->s_dn.bv_val = (char *)(sr + 1);
- sr->s_dn.bv_len = opc->sdn.bv_len;
- sr->s_mode = mode;
- sr->s_isreference = opc->sreference;
- sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
- sr->s_ndn.bv_len = opc->sndn.bv_len;
- *(sr->s_ndn.bv_val++) = '\0';
- sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
- sr->s_uuid.bv_len = opc->suuid.bv_len;
- *(sr->s_uuid.bv_val++) = '\0';
- sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
- sr->s_csn.bv_len = opc->sctxcsn.bv_len;
- strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
-
- if ( !so->s_res ) {
- so->s_res = sr;
- } else {
- so->s_restail->s_next = sr;
- }
- so->s_restail = sr;
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
- return LDAP_SUCCESS;
-}
-
-/* Play back queued responses */
-static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue );
-
-static int
-syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
-{
- syncres *sr, *srnext;
- Entry *e;
- opcookie opc;
- int rc;
-
- opc.son = on;
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- for (sr = so->s_res; sr; sr=srnext) {
- srnext = sr->s_next;
- opc.sdn = sr->s_dn;
- opc.sndn = sr->s_ndn;
- opc.suuid = sr->s_uuid;
- opc.sctxcsn = sr->s_csn;
- opc.sreference = sr->s_isreference;
- e = NULL;
-
- if ( sr->s_mode != LDAP_SYNC_DELETE ) {
- rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
- if ( rc ) {
- ch_free( sr );
- so->s_res = srnext;
- continue;
- }
- }
- rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode, 0 );
-
- if ( e ) {
- be_entry_release_rw( op, e, 0 );
- }
- if ( rc )
- break;
-
- ch_free( sr );
- so->s_res = srnext;
- }
- op->o_bd->bd_info = (BackendInfo *)on;
- if ( !so->s_res )
- so->s_restail = NULL;
- return rc;
-}
-
/* Send a persistent search response */
static int
-syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode, int queue )
+syncprov_sendresp( Operation *op, opcookie *opc, syncops *so, Entry **e, int mode)
{
slap_overinst *on = opc->son;
struct berval cookie;
Entry e_uuid = {0};
Attribute a_uuid = {0};
- Operation sop = *so->s_op;
- Opheader ohdr;
if ( so->s_op->o_abandon )
return SLAPD_ABANDON;
- ohdr = *sop.o_hdr;
- sop.o_hdr = &ohdr;
- sop.o_tmpmemctx = op->o_tmpmemctx;
- sop.o_bd = op->o_bd;
- sop.o_controls = op->o_controls;
- sop.o_private = op->o_private;
- sop.o_callback = NULL;
-
- /* If queueing is allowed */
- if ( queue ) {
- ldap_pvt_thread_mutex_lock( &so->s_mutex );
- /* If we're still in refresh mode, must queue */
- if (so->s_flags & PS_IS_REFRESHING) {
- return syncprov_qresp( opc, so, mode );
- }
- /* If connection is free but queue is non-empty,
- * try to flush the queue.
- */
- if ( so->s_res ) {
- rs.sr_err = syncprov_qplay( &sop, on, so );
- }
- /* If the connection is busy, must queue */
- if ( sop.o_conn->c_writewaiter || rs.sr_err == LDAP_BUSY ) {
- return syncprov_qresp( opc, so, mode );
- }
- ldap_pvt_thread_mutex_unlock( &so->s_mutex );
-
- /* If syncprov_qplay returned any other error, bail out. */
- if ( rs.sr_err ) {
- return rs.sr_err;
- }
- } else {
- /* Queueing not allowed and conn is busy, give up */
- if ( sop.o_conn->c_writewaiter )
- return LDAP_BUSY;
- }
-
ctrls[1] = NULL;
slap_compose_sync_cookie( op, &cookie, &opc->sctxcsn, so->s_rid );
e_uuid.e_attrs = &a_uuid;
a_uuid.a_desc = slap_schema.si_ad_entryUUID;
a_uuid.a_nvals = &opc->suuid;
- rs.sr_err = syncprov_state_ctrl( &sop, &rs, &e_uuid,
+ rs.sr_err = syncprov_state_ctrl( op, &rs, &e_uuid,
mode, ctrls, 0, 1, &cookie );
rs.sr_ctrls = ctrls;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
- rs.sr_ref = get_entry_referrals( &sop, rs.sr_entry );
- send_search_reference( &sop, &rs );
+ rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
+ send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
if ( !rs.sr_entry )
*e = NULL;
rs.sr_entry = *e;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
- rs.sr_attrs = sop.ors_attrs;
- send_search_entry( &sop, &rs );
+ rs.sr_attrs = op->ors_attrs;
+ send_search_entry( op, &rs );
if ( !rs.sr_entry )
*e = NULL;
break;
if ( opc->sreference ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
- send_search_reference( &sop, &rs );
+ send_search_reference( op, &rs );
} else {
- send_search_entry( &sop, &rs );
+ send_search_entry( op, &rs );
}
break;
default:
assert(0);
}
op->o_tmpfree( rs.sr_ctrls[0], op->o_tmpmemctx );
- op->o_private = sop.o_private;
rs.sr_ctrls = NULL;
- /* Check queue again here; if we were hanging in a send and eventually
- * recovered, there may be more to send now. But don't check if the
- * original psearch has been abandoned.
- */
- if ( so->s_op->o_abandon )
- return SLAPD_ABANDON;
- if ( rs.sr_err == LDAP_SUCCESS && queue && so->s_res ) {
+ return rs.sr_err;
+}
+
+/* Play back queued responses */
+static int
+syncprov_qplay( Operation *op, slap_overinst *on, syncops *so )
+{
+ syncres *sr;
+ Entry *e;
+ opcookie opc;
+ int rc;
+
+ opc.son = on;
+ op->o_bd->bd_info = (BackendInfo *)on->on_info;
+
+ for (;;) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
- rs.sr_err = syncprov_qplay( &sop, on, so );
+ sr = so->s_res;
+ if ( sr )
+ so->s_res = sr->s_next;
+ if ( !so->s_res )
+ so->s_restail = NULL;
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+
+ if ( !sr )
+ break;
+
+ if ( !so->s_op->o_abandon ) {
+ opc.sdn = sr->s_dn;
+ opc.sndn = sr->s_ndn;
+ opc.suuid = sr->s_uuid;
+ opc.sctxcsn = sr->s_csn;
+ opc.sreference = sr->s_isreference;
+ e = NULL;
+
+ if ( sr->s_mode != LDAP_SYNC_DELETE ) {
+ rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+ if ( rc ) {
+ ch_free( sr );
+ continue;
+ }
+ }
+ rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
+
+ if ( e ) {
+ be_entry_release_rw( op, e, 0 );
+ }
+ if ( rc )
+ break;
+ }
+
+ ch_free( sr );
}
- return rs.sr_err;
+ op->o_bd->bd_info = (BackendInfo *)on;
+ return rc;
+}
+
+/* runqueue task for playing back queued responses */
+static void *
+syncprov_qtask( void *ctx, void *arg )
+{
+ struct re_s *rtask = arg;
+ syncops *so = rtask->arg;
+ slap_overinst *on = so->s_op->o_private;
+ char opbuf[OPERATION_BUFFER_SIZE];
+ Operation *op;
+ BackendDB be;
+
+ op = (Operation *)opbuf;
+ memset( op, 0, sizeof(opbuf));
+ op->o_hdr = (Opheader *)(op+1);
+ op->o_controls = (void **)(op->o_hdr+1);
+
+ *op->o_hdr = *so->s_op->o_hdr;
+
+ op->o_tmpmemctx = slap_sl_mem_create(SLAP_SLAB_SIZE, SLAP_SLAB_STACK, ctx);
+ op->o_tmpmfuncs = &slap_sl_mfuncs;
+ op->o_threadctx = ctx;
+
+ /* syncprov_qplay expects a fake db */
+ be = *so->s_op->o_bd;
+ be.be_flags |= SLAP_DBFLAG_OVERLAY;
+ op->o_bd = &be;
+ op->o_private = NULL;
+ op->o_callback = NULL;
+
+ syncprov_qplay( op, on, so );
+
+ /* wait until we get explicitly scheduled again */
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+
+ return NULL;
+}
+
+/* Queue a persistent search response */
+static int
+syncprov_qresp( opcookie *opc, syncops *so, int mode )
+{
+ syncres *sr;
+
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ sr = ch_malloc(sizeof(syncres) + opc->suuid.bv_len + 1 +
+ opc->sdn.bv_len + 1 + opc->sndn.bv_len + 1 + opc->sctxcsn.bv_len + 1 );
+ sr->s_next = NULL;
+ sr->s_dn.bv_val = (char *)(sr + 1);
+ sr->s_dn.bv_len = opc->sdn.bv_len;
+ sr->s_mode = mode;
+ sr->s_isreference = opc->sreference;
+ sr->s_ndn.bv_val = lutil_strcopy( sr->s_dn.bv_val, opc->sdn.bv_val );
+ sr->s_ndn.bv_len = opc->sndn.bv_len;
+ *(sr->s_ndn.bv_val++) = '\0';
+ sr->s_uuid.bv_val = lutil_strcopy( sr->s_ndn.bv_val, opc->sndn.bv_val );
+ sr->s_uuid.bv_len = opc->suuid.bv_len;
+ *(sr->s_uuid.bv_val++) = '\0';
+ sr->s_csn.bv_val = lutil_strcopy( sr->s_uuid.bv_val, opc->suuid.bv_val );
+ sr->s_csn.bv_len = opc->sctxcsn.bv_len;
+ strcpy( sr->s_csn.bv_val, opc->sctxcsn.bv_val );
+
+ if ( !so->s_res ) {
+ so->s_res = sr;
+ } else {
+ so->s_restail->s_next = sr;
+ }
+ so->s_restail = sr;
+ if ( so->s_flags & PS_IS_DETACHED ) {
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ if ( !so->s_qtask ) {
+ so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, 0,
+ syncprov_qtask, so, "syncprov_qtask",
+ so->s_op->o_conn->c_peer_name.bv_val );
+ } else {
+ if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask )) {
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+ }
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ return LDAP_SUCCESS;
}
static void
e = op->ora_e;
}
- if ( saveit ) {
+ if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
ber_dupbv_x( &opc->sdn, &e->e_name, op->o_tmpmemctx );
ber_dupbv_x( &opc->sndn, &e->e_nname, op->o_tmpmemctx );
opc->sreference = is_entry_referral( e );
- }
- if ( saveit || op->o_tag == LDAP_REQ_ADD ) {
a = attr_find( e->e_attrs, slap_schema.si_ad_entryUUID );
if ( a )
ber_dupbv_x( &opc->suuid, &a->a_nvals[0], op->o_tmpmemctx );
/* if found send UPDATE else send ADD */
ss->s_inuse++;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
- syncprov_sendresp( op, opc, ss, &e,
- found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD, 1 );
+ syncprov_qresp( opc, ss,
+ found ? LDAP_SYNC_MODIFY : LDAP_SYNC_ADD );
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
ss->s_inuse--;
}
} else if ( !saveit && found ) {
/* send DELETE */
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
- syncprov_sendresp( op, opc, ss, NULL, LDAP_SYNC_DELETE, 1 );
+ syncprov_qresp( opc, ss, LDAP_SYNC_DELETE );
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
}
}
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
if ( sm->sm_op->s_op->o_abandon )
continue;
- syncprov_sendresp( op, opc, sm->sm_op, NULL,
- LDAP_SYNC_DELETE, 1 );
+ syncprov_qresp( opc, sm->sm_op, LDAP_SYNC_DELETE );
}
break;
}
}
static void
-syncprov_detach_op( Operation *op, syncops *so )
+syncprov_detach_op( Operation *op, syncops *so, slap_overinst *on )
{
Operation *op2;
int i, alen = 0;
*op2->o_hdr = *op->o_hdr;
op2->o_tag = op->o_tag;
op2->o_time = op->o_time;
- op2->o_bd = op->o_bd;
+ op2->o_bd = on->on_info->oi_origdb;
op2->o_request = op->o_request;
+ op2->o_private = on;
if ( i ) {
op2->ors_attrs = (AttributeName *)(op2->o_hdr + 1);
0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS :
LDAP_SYNC_REFRESH_DELETES );
} else {
- int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
if ( ss->ss_so->s_res ) {
- ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
- locked = 1;
syncprov_qplay( op, on, ss->ss_so );
}
+ /* Detach this Op from frontend control */
+ ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
+
/* Turn off the refreshing flag */
ss->ss_so->s_flags ^= PS_IS_REFRESHING;
- if ( locked )
- ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
- /* Detach this Op from frontend control */
- syncprov_detach_op( op, ss->ss_so );
+ syncprov_detach_op( op, ss->ss_so, on );
+ ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
return LDAP_SUCCESS;
}