#include "lutil.h"
#include "slap.h"
+/* A modify request on a particular entry */
+typedef struct modinst {
+ struct modinst *mi_next;
+ Operation *mi_op;
+} modinst;
+
+typedef struct modtarget {
+ struct modinst *mt_mods;
+ struct modinst *mt_tail;
+ Operation *mt_op;
+ ldap_pvt_thread_mutex_t mt_mutex;
+} modtarget;
+
/* A queued result of a persistent search */
typedef struct syncres {
struct syncres *s_next;
long s_rid;
struct berval s_filterstr;
int s_flags; /* search status */
+ int s_inuse; /* reference count */
struct syncres *s_res;
struct syncres *s_restail;
ldap_pvt_thread_mutex_t s_mutex;
syncops *sm_op;
} syncmatches;
+/* The main state for this overlay */
typedef struct syncprov_info_t {
syncops *si_ops;
struct berval si_ctxcsn; /* ldapsync context */
int si_gotcsn; /* is our ctxcsn up to date? */
+ Avlnode *si_mods; /* entries being modified */
ldap_pvt_thread_mutex_t si_csn_mutex;
ldap_pvt_thread_mutex_t si_ops_mutex;
+ ldap_pvt_thread_mutex_t si_mods_mutex;
char si_ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
} syncprov_info_t;
static AttributeName csn_anlist[2];
static AttributeName uuid_anlist[2];
+/* Build a LDAPsync intermediate state control */
static int
syncprov_state_ctrl(
Operation *op,
return LDAP_SUCCESS;
}
+/* Build a LDAPsync final state control */
static int
syncprov_done_ctrl(
Operation *op,
return LDAP_SUCCESS;
}
-
+#if 0
+/* Generate state based on session log - not implemented yet */
static int
syncprov_state_ctrl_from_slog(
Operation *op,
return LDAP_SUCCESS;
}
+#endif
-int
+static int
syncprov_sendinfo(
Operation *op,
SlapReply *rs,
return ret;
}
+ rs->sr_rspoid = LDAP_SYNC_INFO;
rs->sr_rspdata = &rspdata;
send_ldap_intermediate( op, rs );
rs->sr_rspdata = NULL;
return LDAP_SUCCESS;
}
+
+/* Find a modtarget in an AVL tree */
+static int
+sp_avl_cmp( const void *c1, const void *c2 )
+{
+ const modtarget *m1, *m2;
+ int rc;
+
+ m1 = c1; m2 = c2;
+ rc = m1->mt_op->o_req_ndn.bv_len - m2->mt_op->o_req_ndn.bv_len;
+
+ if ( rc ) return rc;
+ return ber_bvcmp( &m1->mt_op->o_req_ndn, &m2->mt_op->o_req_ndn );
+}
+
/* syncprov_findbase:
* finds the true DN of the base of a search (with alias dereferencing) and
* checks to make sure the base entry doesn't get replaced with a different
pc->num++;
ret = LDAP_SUCCESS;
if ( pc->num == SLAP_SYNCUUID_SET_SIZE ) {
- rs->sr_rspoid = LDAP_SYNC_INFO;
ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
0, pc->uuids, 0 );
ber_bvarray_free_x( pc->uuids, op->o_tmpmemctx );
} else if ( rs->sr_type == REP_RESULT ) {
ret = rs->sr_err;
if ( pc->num ) {
- rs->sr_rspoid = LDAP_SYNC_INFO;
ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
0, pc->uuids, 0 );
ber_bvarray_free_x( pc->uuids, op->o_tmpmemctx );
return rs.sr_err;
}
+static void
+syncprov_free_syncop( syncops *so )
+{
+ syncres *sr, *srnext;
+
+ ldap_pvt_thread_mutex_lock( &so->s_mutex );
+ so->s_inuse--;
+ if ( so->s_inuse > 0 ) {
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ return;
+ }
+ ldap_pvt_thread_mutex_unlock( &so->s_mutex );
+ filter_free( so->s_op->ors_filter );
+ ch_free( so->s_op );
+ ch_free( so->s_base.bv_val );
+ for ( sr=so->s_res; sr; sr=srnext ) {
+ srnext = sr->s_next;
+ ch_free( sr );
+ }
+ ldap_pvt_thread_mutex_destroy( &so->s_mutex );
+ ch_free( so );
+}
+
+static int
+syncprov_drop_psearch( syncops *so )
+{
+ ldap_pvt_thread_mutex_lock( &so->s_op->o_conn->c_mutex );
+ so->s_op->o_conn->c_n_ops_executing--;
+ so->s_op->o_conn->c_n_ops_completed++;
+ ldap_pvt_thread_mutex_unlock( &so->s_op->o_conn->c_mutex );
+ syncprov_free_syncop( so );
+}
+
+static int
+syncprov_op_abandon( Operation *op, SlapReply *rs )
+{
+ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ syncprov_info_t *si = on->on_bi.bi_private;
+ syncops *so, *soprev;
+
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ for ( so=si->si_ops, soprev = (syncops *)&si->si_ops; so;
+ soprev=so, so=so->s_next ) {
+ if ( so->s_op->o_connid == op->o_connid &&
+ so->s_op->o_msgid == op->orn_msgid ) {
+ so->s_op->o_abandon = 1;
+ soprev->s_next = so->s_next;
+ break;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+ if ( so ) {
+ /* Is this really a Cancel exop? */
+ if ( op->o_tag != LDAP_REQ_ABANDON ) {
+ rs->sr_err = LDAP_CANCELLED;
+ send_ldap_result( so->s_op, rs );
+ }
+ syncprov_drop_psearch( so );
+ }
+ return SLAP_CB_CONTINUE;
+}
+
+/* Find which persistent searches are affected by this operation */
static void
syncprov_matchops( Operation *op, opcookie *opc, int saveit )
{
syncprov_info_t *si = on->on_bi.bi_private;
fbase_cookie fc;
- syncops *ss;
+ syncops *ss, *sprev, *snext;
Entry *e;
Attribute *a;
int rc;
}
ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
- for (ss = si->si_ops; ss; ss=ss->s_next)
+ for (ss = si->si_ops, sprev = (syncops *)&si->si_ops; ss;
+ sprev = ss, ss=snext)
{
syncmatches *sm;
int found = 0;
+ snext = ss->s_next;
/* validate base */
fc.fss = ss;
fc.fbase = 0;
fc.fscope = 0;
+
+ /* If the base of the search is missing, signal a refresh */
rc = syncprov_findbase( op, &fc );
- if ( rc != LDAP_SUCCESS ) continue;
+ if ( rc != LDAP_SUCCESS ) {
+ SlapReply rs = {REP_RESULT};
+ send_ldap_error( ss->s_op, &rs, LDAP_SYNC_REFRESH_REQUIRED,
+ "search base has changed" );
+ sprev->s_next = snext;
+ syncprov_drop_psearch( ss );
+ continue;
+ }
/* If we're sending results now, look for this op in old matches */
if ( !saveit ) {
sm = op->o_tmpalloc( sizeof(syncmatches), op->o_tmpmemctx );
sm->sm_next = opc->smatches;
sm->sm_op = ss;
+ ss->s_inuse++;
opc->smatches = sm;
} else {
/* if found send UPDATE else send ADD */
{
slap_callback *cb = op->o_callback;
opcookie *opc = cb->sc_private;
+ slap_overinst *on = opc->son;
+ syncprov_info_t *si = on->on_bi.bi_private;
syncmatches *sm, *snext;
+ modtarget *mt, mtdummy;
for (sm = opc->smatches; sm; sm=snext) {
snext = sm->sm_next;
+ syncprov_free_syncop( sm->sm_op );
op->o_tmpfree( sm, op->o_tmpmemctx );
}
+
+ /* Remove op from lock table */
+ mtdummy.mt_op = op;
+ ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
+ mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
+ if ( mt ) {
+ modinst *mi = mt->mt_mods;
+
+ /* If there are more, promote the next one */
+ ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+ if ( mi->mi_next ) {
+ mt->mt_mods = mi->mi_next;
+ mt->mt_op = mt->mt_mods->mi_op;
+ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+ } else {
+ avl_delete( &si->si_mods, mt, sp_avl_cmp );
+ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+ ldap_pvt_thread_mutex_destroy( &mt->mt_mutex );
+ ch_free( mt );
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
op->o_callback = cb->sc_next;
op->o_tmpfree(cb, op->o_tmpmemctx);
}
struct berval maxcsn;
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+ /* Update our context CSN */
cbuf[0] = '\0';
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
slap_get_commit_csn( op, &maxcsn );
opc->sctxcsn.bv_len = maxcsn.bv_len;
opc->sctxcsn.bv_val = cbuf;
+ /* Handle any persistent searches */
if ( si->si_ops ) {
switch(op->o_tag) {
case LDAP_REQ_ADD:
/* for each match in opc->smatches:
* send DELETE msg
*/
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
for ( sm = opc->smatches; sm; sm=sm->sm_next ) {
+ if ( sm->sm_op->s_op->o_abandon )
+ continue;
syncprov_sendresp( op, opc, sm->sm_op, NULL,
LDAP_SYNC_DELETE, 1 );
}
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
break;
}
}
return SLAP_CB_CONTINUE;
}
-#if 0
+/* We don't use a subentry to store the context CSN any more.
+ * We expose the current context CSN as an operational attribute
+ * of the suffix entry.
+ */
static int
syncprov_op_compare( Operation *op, SlapReply *rs )
{
syncprov_info_t *si = on->on_bi.bi_private;
int rc = SLAP_CB_CONTINUE;
- if ( dn_match( &op->o_req_ndn, &si->si_e->e_nname ) )
+ if ( dn_match( &op->o_req_ndn, op->o_bd->be_nsuffix ) &&
+ op->oq_compare.rs_ava->aa_desc == slap_schema.si_ad_contextCSN )
{
- Attribute *a;
+ Entry e = {0};
+ Attribute a = {0};
+ struct berval bv[2];
- ldap_pvt_thread_mutex_lock( &si->si_e_mutex );
+ e.e_name = op->o_bd->be_suffix[0];
+ e.e_nname = op->o_bd->be_nsuffix[0];
- if ( get_assert( op ) &&
- ( test_filter( op, si->si_e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) )
- {
- rs->sr_err = LDAP_ASSERTION_FAILED;
- goto return_results;
- }
+ bv[1].bv_val = NULL;
+ bv[0] = si->si_ctxcsn;
- rs->sr_err = access_allowed( op, si->si_e, op->oq_compare.rs_ava->aa_desc,
+ a.a_desc = slap_schema.si_ad_contextCSN;
+ a.a_vals = bv;
+ a.a_nvals = a.a_vals;
+
+ ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+
+ rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc,
&op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL );
if ( ! rs->sr_err ) {
rs->sr_err = LDAP_INSUFFICIENT_ACCESS;
goto return_results;
}
- rs->sr_err = LDAP_NO_SUCH_ATTRIBUTE;
+ if ( get_assert( op ) &&
+ ( test_filter( op, &e, get_assertion( op ) ) != LDAP_COMPARE_TRUE ) )
+ {
+ rs->sr_err = LDAP_ASSERTION_FAILED;
+ goto return_results;
+ }
+
+
+ rs->sr_err = LDAP_COMPARE_FALSE;
- for ( a = attr_find( si->si_e->e_attrs, op->oq_compare.rs_ava->aa_desc );
- a != NULL;
- a = attr_find( a->a_next, op->oq_compare.rs_ava->aa_desc ) )
+ if ( value_find_ex( op->oq_compare.rs_ava->aa_desc,
+ SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH |
+ SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH,
+ a.a_nvals, &op->oq_compare.rs_ava->aa_value, op->o_tmpmemctx ) == 0 )
{
- rs->sr_err = LDAP_COMPARE_FALSE;
-
- if ( value_find_ex( op->oq_compare.rs_ava->aa_desc,
- SLAP_MR_ATTRIBUTE_VALUE_NORMALIZED_MATCH |
- SLAP_MR_ASSERTED_VALUE_NORMALIZED_MATCH,
- a->a_nvals, &op->oq_compare.rs_ava->aa_value, op->o_tmpmemctx ) == 0 )
- {
- rs->sr_err = LDAP_COMPARE_TRUE;
- break;
- }
+ rs->sr_err = LDAP_COMPARE_TRUE;
}
return_results:;
- ldap_pvt_thread_mutex_unlock( &si->si_e_mutex );
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
send_ldap_result( op, rs );
rc = rs->sr_err;
}
- return SLAP_CB_CONTINUE;
+ return rc;
}
-#endif
static int
syncprov_op_mod( Operation *op, SlapReply *rs )
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
- slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+sizeof(opcookie), op->o_tmpmemctx);
+ slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+
+ sizeof(opcookie) +
+ (si->si_ops ? sizeof(modinst) : 0 ),
+ op->o_tmpmemctx);
opcookie *opc = (opcookie *)(cb+1);
opc->son = on;
cb->sc_response = syncprov_op_response;
cb->sc_next = op->o_callback;
op->o_callback = cb;
- if ( si->si_ops && op->o_tag != LDAP_REQ_ADD )
- syncprov_matchops( op, opc, 1 );
+ /* If there are active persistent searches, lock this operation.
+ * See seqmod.c for the locking logic on its own.
+ */
+ if ( si->si_ops ) {
+ modtarget *mt, mtdummy;
+ modinst *mi;
+
+ mi = (modinst *)(opc+1);
+ mi->mi_op = op;
+
+ /* See if we're already modifying this entry... */
+ mtdummy.mt_op = op;
+ ldap_pvt_thread_mutex_lock( &si->si_mods_mutex );
+ mt = avl_find( si->si_mods, &mtdummy, sp_avl_cmp );
+ if ( mt ) {
+ ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+ ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
+ mt->mt_tail->mi_next = mi;
+ mt->mt_tail = mi;
+ /* wait for this op to get to head of list */
+ while ( mt->mt_mods != mi ) {
+ ldap_pvt_thread_mutex_unlock( &mt->mt_mutex );
+ ldap_pvt_thread_yield();
+ ldap_pvt_thread_mutex_lock( &mt->mt_mutex );
+ }
+ } else {
+ /* Record that we're modifying this entry now */
+ mt = malloc( sizeof(modtarget) );
+ mt->mt_mods = mi;
+ mt->mt_tail = mi;
+ mt->mt_op = mi->mi_op;
+ ldap_pvt_thread_mutex_init( &mt->mt_mutex );
+ avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
+ ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
+ }
+
+ if ( op->o_tag != LDAP_REQ_ADD )
+ syncprov_matchops( op, opc, 1 );
+ }
return SLAP_CB_CONTINUE;
}
/* Increment number of ops so that idletimeout ignores us */
ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
op->o_conn->c_n_ops_executing++;
+ op->o_conn->c_n_ops_completed--;
ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
}
} else {
int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
- rs->sr_rspoid = LDAP_SYNC_INFO;
syncprov_sendinfo( op, rs, rs->sr_nentries ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
sop->s_sid = srs->sr_state.sid;
sop->s_rid = srs->sr_state.rid;
sop->s_next = si->si_ops;
+ sop->s_inuse = 1;
si->si_ops = sop;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}
fava->f_choice = LDAP_FILTER_LE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
+ ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
ber_dupbv_x( &fava->f_ava->aa_value, &si->si_ctxcsn, op->o_tmpmemctx );
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
fand->f_and = fava;
if ( gotstate ) {
fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
return SLAP_CB_CONTINUE;
}
+static int
+syncprov_operational(
+ Operation *op,
+ SlapReply *rs )
+{
+ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+
+ if ( rs->sr_entry &&
+ dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {
+
+ Attribute **ap;
+
+ for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ;
+
+ if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
+ ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
+
+ Attribute *a = ch_malloc( sizeof(Attribute));
+ a->a_desc = slap_schema.si_ad_contextCSN;
+ a->a_vals = ch_malloc( 2 * sizeof(struct berval));
+
+ ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+
+ a->a_vals[1].bv_val = NULL;
+ a->a_nvals = a->a_vals;
+ a->a_next = NULL;
+ a->a_flags = 0;
+
+ *ap = a;
+ }
+ }
+ return LDAP_SUCCESS;
+}
+
static int
syncprov_db_config(
BackendDB *be,
syncprov.on_bi.bi_db_config = syncprov_db_config;
syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
+ syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
+ syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;
+
syncprov.on_bi.bi_op_add = syncprov_op_mod;
-#if 0
syncprov.on_bi.bi_op_compare = syncprov_op_compare;
-#endif
syncprov.on_bi.bi_op_delete = syncprov_op_mod;
syncprov.on_bi.bi_op_modify = syncprov_op_mod;
syncprov.on_bi.bi_op_modrdn = syncprov_op_mod;
syncprov.on_bi.bi_op_search = syncprov_op_search;
syncprov.on_bi.bi_extended = syncprov_op_extended;
+ syncprov.on_bi.bi_operational = syncprov_operational;
#if 0
syncprov.on_response = syncprov_response;