so that unused target connections can be properly reset.
assert( 0 );
break;
}
+
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_one_listener(mc, candidates, candidate);
+ asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
typedef struct bm_context_t {
LDAP_SLIST_ENTRY(bm_context_t) bc_next;
time_t timeout;
- time_t stoptime;
+ time_t stoptime;
ldap_back_send_t sendok;
ldap_back_send_t retrying;
int candidate_match;
int is_ok;
SlapReply rs;
Operation *op;
- LDAPControl **ctrls;
+ LDAPControl **ctrls;
+ int *msgids;
SlapReply *candidates;
} bm_context_t;
/* NOTE: lc_lcflags is redefined to msc_mscflags to reuse the macros
* defined for back-ldap */
#define lc_lcflags msc_mscflags
-
+ int msc_pending_ops;
int msc_timeout_ops;
/* Connection for the select */
Connection *conn;
asyncmeta_send_result(bm_context_t* bc, int error, char *text);
int asyncmeta_new_bm_context(Operation *op, SlapReply *rs, bm_context_t **new_bc, int ntargets);
-int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates);
-int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, int candidate);
+int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc);
+int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc, int candidate);
meta_search_candidate_t
asyncmeta_back_search_start(
msc->msc_time = 0;
msc->msc_mscflags = 0;
msc->msc_timeout_ops = 0;
+ msc->msc_pending_ops = 0;
return 0;
}
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_one_listener(mc, candidates, candidate);
+ asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
return mc;
}
-int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates)
+int asyncmeta_start_listeners(a_metaconn_t *mc, SlapReply *candidates, bm_context_t *bc)
{
int i;
for (i = 0; i < mc->mc_info->mi_ntargets; i++) {
- asyncmeta_start_one_listener(mc, candidates, i);
+ asyncmeta_start_one_listener(mc, candidates, bc, i);
}
return LDAP_SUCCESS;
}
-int asyncmeta_start_one_listener(a_metaconn_t *mc, SlapReply *candidates, int candidate)
+int asyncmeta_start_one_listener(a_metaconn_t *mc,
+ SlapReply *candidates,
+ bm_context_t *bc,
+ int candidate)
{
a_metasingleconn_t *msc;
ber_socket_t s;
if (msc->msc_ld == NULL || !META_IS_CANDIDATE( &candidates[ candidate ] )) {
return LDAP_SUCCESS;
}
+ bc->msgids[candidate] = candidates[candidate].sr_msgid;
+ msc->msc_pending_ops++;
if ( msc->conn == NULL) {
ldap_get_option( msc->msc_ld, LDAP_OPT_DESC, &s );
if (s < 0) {
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_one_listener(mc, candidates, candidate);
+ asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
int asyncmeta_new_bm_context(Operation *op, SlapReply *rs, bm_context_t **new_bc, int ntargets)
{
void *oldctx = op->o_tmpmemctx;
-
+ int i;
/* prevent old memctx from being destroyed */
slap_sl_mem_setctx(op->o_threadctx, NULL);
/* create new memctx */
(*new_bc)->op = asyncmeta_copy_op(op);
(*new_bc)->candidates = op->o_tmpcalloc(ntargets, sizeof(SlapReply),op->o_tmpmemctx);
+ (*new_bc)->msgids = op->o_tmpcalloc(ntargets, sizeof(int),op->o_tmpmemctx);
+ for (i = 0; i < ntargets; i++) {
+ (*new_bc)->msgids[i] = META_MSGID_UNDEFINED;
+ }
/* restore original memctx */
slap_sl_mem_setctx(op->o_threadctx, oldctx);
op->o_tmpmemctx = oldctx;
asyncmeta_drop_bc(a_metaconn_t *mc, bm_context_t *bc)
{
bm_context_t *om;
+ int i;
LDAP_SLIST_FOREACH( om, &mc->mc_om_list, bc_next ) {
if (om == bc) {
+ for (i = 0; i < mc->mc_info->mi_ntargets; i++)
+ {
+ if (bc->msgids[i] >= 0) {
+ mc->mc_conns[i].msc_pending_ops--;
+ }
+ }
LDAP_SLIST_REMOVE(&mc->mc_om_list, om, bm_context_t, bc_next);
mc->pending_ops--;
break;
for (j=0; j<ntargets; j++) {
i++;
if (i >= ntargets) i = 0;
- if (!mc->mc_conns[i].msc_ldr) continue;
+ if (!mc->mc_conns[i].msc_ldr || mc->mc_conns[i].msc_pending_ops <= 0) continue;
rc = ldap_result( mc->mc_conns[i].msc_ldr, LDAP_RES_ANY, LDAP_MSG_RECEIVED, &tv, &msg );
msc = &mc->mc_conns[i];
if (rc < 1) {
a_metasingleconn_t *msc = &mc->mc_conns[j];
a_metatarget_t *mt = mi->mi_targets[j];
msc->msc_timeout_ops++;
+ if (bc->msgids[j] >= 0) {
+ msc->msc_pending_ops--;
+ }
asyncmeta_back_cancel( mc, op,
bc->candidates[ j ].sr_msgid, j );
if (!META_BACK_TGT_QUARANTINE( mt ) ||
}
}
- if (!mc->pending_ops && mi->mi_idle_timeout) {
+ if (mi->mi_idle_timeout) {
for (j=0; j<mi->mi_ntargets; j++) {
a_metasingleconn_t *msc = &mc->mc_conns[j];
+ if (msc->msc_pending_ops > 0) {
+ continue;
+ }
if (msc->msc_ld && msc->msc_time > 0 && msc->msc_time + mi->mi_idle_timeout <= current_time) {
- if (mc->mc_active < 1) {
- asyncmeta_clear_one_msc(NULL, mc, j);
- }
+ asyncmeta_clear_one_msc(NULL, mc, j);
}
}
}
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_one_listener(mc, candidates, candidate);
+ asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
break;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_one_listener(mc, candidates, candidate);
+ asyncmeta_start_one_listener(mc, candidates, bc, candidate);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;
initial_candidates = 0, candidate_match = 0,
needbind = 0;
ldap_back_send_t sendok = LDAP_BACK_SENDERR;
- long i;
+ long i,j;
int is_ok = 0;
void *savepriv;
SlapReply *candidates = NULL;
for ( i = 0; i < mi->mi_ntargets; i++ ) {
/* reset sr_msgid; it is used in most loops
* to check if that target is still to be considered */
- candidates[ i ].sr_msgid = META_MSGID_UNDEFINED;
-
+ candidates[i].sr_msgid = META_MSGID_UNDEFINED;
/* a target is marked as candidate by asyncmeta_getconn();
* if for any reason (an error, it's over or so) it is
* no longer active, sr_msgid is set to META_MSGID_IGNORE
goto finish;
}
ldap_pvt_thread_mutex_lock( &mc->mc_om_mutex);
- asyncmeta_start_listeners(mc, candidates);
+ asyncmeta_start_listeners(mc, candidates, bc);
ldap_pvt_thread_mutex_unlock( &mc->mc_om_mutex);
finish:
return rs->sr_err;