/* syncprov.c - syncrepl provider */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
- * Copyright 2004-2005 The OpenLDAP Foundation.
+ * Copyright 2004-2008 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
#define PS_IS_DETACHED 0x02
#define PS_WROTE_BASE 0x04
#define PS_FIND_BASE 0x08
+#define PS_FIX_FILTER 0x10
int s_inuse; /* reference count */
struct syncres *s_res;
time_t si_chklast; /* time of last checkpoint */
Avlnode *si_mods; /* entries being modified */
sessionlog *si_logs;
- ldap_pvt_thread_mutex_t si_csn_mutex;
+ ldap_pvt_thread_rdwr_t si_csn_rwlock;
ldap_pvt_thread_mutex_t si_ops_mutex;
ldap_pvt_thread_mutex_t si_mods_mutex;
char si_ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
slap_callback cb = {0};
Operation fop;
SlapReply frs = { REP_RESULT };
+ BackendInfo *bi;
int rc;
fc->fss->s_flags ^= PS_FIND_BASE;
fop.o_bd = op->o_bd;
fop.o_time = op->o_time;
fop.o_tincr = op->o_tincr;
+ bi = op->o_bd->bd_info;
cb.sc_response = findbase_cb;
cb.sc_private = fc;
fop.ors_filter = &generic_filter;
fop.ors_filterstr = generic_filterstr;
- fop.o_bd->bd_info = on->on_info->oi_orig;
- rc = fop.o_bd->be_search( &fop, &frs );
- fop.o_bd->bd_info = (BackendInfo *)on;
+ rc = overlay_op_walk( &fop, &frs, op_search, on->on_info, on );
+ op->o_bd->bd_info = bi;
} else {
ldap_pvt_thread_mutex_unlock( &fc->fss->s_mutex );
fc->fbase = 1;
char buf[LDAP_LUTIL_CSNSTR_BUFSIZE + STRLENOF("(entryCSN<=)")];
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
struct berval maxcsn;
- Filter cf, af;
+ Filter cf;
#ifdef LDAP_COMP_MATCH
AttributeAssertion eq = { NULL, BER_BVNULL, NULL };
#else
} else {
cf.f_choice = LDAP_FILTER_LE;
fop.ors_limit = &fc_limits;
+ memset( &fc_limits, 0, sizeof( fc_limits ));
fc_limits.lms_s_unchecked = 1;
fop.ors_filterstr.bv_len = sprintf( buf, "(entryCSN<=%s)",
cf.f_av_value.bv_val );
cb.sc_response = findcsn_cb;
break;
case FIND_PRESENT:
- af.f_choice = LDAP_FILTER_AND;
- af.f_next = NULL;
- af.f_and = &cf;
- cf.f_choice = LDAP_FILTER_LE;
- cf.f_av_value = srs->sr_state.ctxcsn;
- cf.f_next = op->ors_filter;
- fop.ors_filter = ⁡
- filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr );
+ fop.ors_filter = op->ors_filter;
+ fop.ors_filterstr = op->ors_filterstr;
fop.ors_attrsonly = 0;
fop.ors_attrs = uuid_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
break;
case FIND_PRESENT:
op->o_tmpfree( pcookie.uuids, op->o_tmpmemctx );
- op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx );
break;
}
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
if ( opc->sreference ) {
rs.sr_ref = get_entry_referrals( op, rs.sr_entry );
- send_search_reference( op, &rs );
+ rs.sr_err = send_search_reference( op, &rs );
ber_bvarray_free( rs.sr_ref );
if ( !rs.sr_entry )
*e = NULL;
if ( rs.sr_entry->e_private )
rs.sr_flags = REP_ENTRY_MUSTRELEASE;
rs.sr_attrs = op->ors_attrs;
- send_search_entry( op, &rs );
+ rs.sr_err = send_search_entry( op, &rs );
if ( !rs.sr_entry )
*e = NULL;
break;
if ( opc->sreference ) {
struct berval bv = BER_BVNULL;
rs.sr_ref = &bv;
- send_search_reference( op, &rs );
+ rs.sr_err = send_search_reference( op, &rs );
} else {
- send_search_entry( op, &rs );
+ rs.sr_err = send_search_entry( op, &rs );
}
break;
default:
syncres *sr;
Entry *e;
opcookie opc;
- int rc;
+ int rc = 0;
opc.son = on;
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
for (;;) {
ldap_pvt_thread_mutex_lock( &so->s_mutex );
e = NULL;
if ( sr->s_mode != LDAP_SYNC_DELETE ) {
- rc = be_entry_get_rw( op, &opc.sndn, NULL, NULL, 0, &e );
+ rc = overlay_entry_get_ov( op, &opc.sndn, NULL, NULL, 0, &e, on );
if ( rc ) {
+ Debug( LDAP_DEBUG_SYNC, "syncprov_qplay: failed to get %s, "
+ "error (%d), ignoring...\n", opc.sndn.bv_val, rc, 0 );
ch_free( sr );
+ rc = 0;
continue;
}
}
rc = syncprov_sendresp( op, &opc, so, &e, sr->s_mode );
if ( e ) {
- be_entry_release_rw( op, e, 0 );
+ overlay_entry_release_ov( op, e, 0, on );
}
ch_free( sr );
if ( rc )
break;
}
- op->o_bd->bd_info = (BackendInfo *)on;
return rc;
}
OperationBuffer opbuf;
Operation *op;
BackendDB be;
+ int rc;
op = (Operation *) &opbuf;
*op = *so->s_op;
op->o_private = NULL;
op->o_callback = NULL;
- syncprov_qplay( op, on, so );
+ rc = syncprov_qplay( op, on, so );
/* decrement use count... */
syncprov_free_syncop( so );
/* wait until we get explicitly scheduled again */
ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- ldap_pvt_runqueue_stoptask( &slapd_rq, so->s_qtask );
- ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 1 );
+ ldap_pvt_runqueue_stoptask( &slapd_rq, rtask );
+ if ( rc == 0 ) {
+ ldap_pvt_runqueue_resched( &slapd_rq, rtask, 1 );
+ } else {
+ /* bail out on any error */
+ ldap_pvt_runqueue_remove( &slapd_rq, rtask );
+ }
ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
return NULL;
}
+/* Start the task to play back queued psearch responses */
+static void
+syncprov_qstart( syncops *so )
+{
+ int wake=0;
+ ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
+ if ( !so->s_qtask ) {
+ so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
+ syncprov_qtask, so, "syncprov_qtask",
+ so->s_op->o_conn->c_peer_name.bv_val );
+ ++so->s_inuse;
+ wake = 1;
+ } else {
+ if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
+ !so->s_qtask->next_sched.tv_sec ) {
+ so->s_qtask->interval.tv_sec = 0;
+ ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
+ so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
+ ++so->s_inuse;
+ wake = 1;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+ if ( wake )
+ slap_wake_listener();
+}
+
/* Queue a persistent search response */
static int
syncprov_qresp( opcookie *opc, syncops *so, int mode )
so->s_flags |= PS_FIND_BASE;
}
if ( so->s_flags & PS_IS_DETACHED ) {
- ldap_pvt_thread_mutex_lock( &slapd_rq.rq_mutex );
- if ( !so->s_qtask ) {
- so->s_qtask = ldap_pvt_runqueue_insert( &slapd_rq, RUNQ_INTERVAL,
- syncprov_qtask, so, "syncprov_qtask",
- so->s_op->o_conn->c_peer_name.bv_val );
- ++so->s_inuse;
- } else {
- if (!ldap_pvt_runqueue_isrunning( &slapd_rq, so->s_qtask ) &&
- !so->s_qtask->next_sched.tv_sec ) {
- so->s_qtask->interval.tv_sec = 0;
- ldap_pvt_runqueue_resched( &slapd_rq, so->s_qtask, 0 );
- so->s_qtask->interval.tv_sec = RUNQ_INTERVAL;
- ++so->s_inuse;
- }
- }
- ldap_pvt_thread_mutex_unlock( &slapd_rq.rq_mutex );
+ syncprov_qstart( so );
}
ldap_pvt_thread_mutex_unlock( &so->s_mutex );
return LDAP_SUCCESS;
fbase_cookie fc;
syncops *ss, *sprev, *snext;
- Entry *e;
+ Entry *e = NULL;
Attribute *a;
int rc;
struct berval newdn;
int freefdn = 0;
+ BackendDB *b0 = op->o_bd, db;
fc.fdn = &op->o_req_ndn;
/* compute new DN */
freefdn = 1;
}
if ( op->o_tag != LDAP_REQ_ADD ) {
- op->o_bd->bd_info = (BackendInfo *)on->on_info;
- rc = be_entry_get_rw( op, fc.fdn, NULL, NULL, 0, &e );
+ if ( !SLAP_ISOVERLAY( op->o_bd )) {
+ db = *op->o_bd;
+ op->o_bd = &db;
+ }
+ rc = overlay_entry_get_ov( op, fc.fdn, NULL, NULL, 0, &e, on );
/* If we're sending responses now, make a copy and unlock the DB */
if ( e && !saveit ) {
Entry *e2 = entry_dup( e );
- be_entry_release_rw( op, e, 0 );
+ overlay_entry_release_ov( op, e, 0, on );
e = e2;
}
- op->o_bd->bd_info = (BackendInfo *)on;
- if ( rc ) return;
+ if ( rc ) {
+ op->o_bd = b0;
+ return;
+ }
} else {
e = op->ora_e;
}
if ( freefdn ) {
op->o_tmpfree( fc.fdn->bv_val, op->o_tmpmemctx );
}
+ op->o_bd = b0;
}
static int
struct berval bv[2];
slap_callback cb = {0};
- mod.sml_values = bv;
- bv[1].bv_val = NULL;
- bv[0] = si->si_ctxcsn;
+ /* If ctxcsn is empty, delete it */
+ if ( BER_BVISEMPTY( &si->si_ctxcsn )) {
+ mod.sml_values = NULL;
+ } else {
+ mod.sml_values = bv;
+ bv[1].bv_val = NULL;
+ bv[0] = si->si_ctxcsn;
+ }
mod.sml_nvalues = NULL;
mod.sml_desc = slap_schema.si_ad_contextCSN;
mod.sml_op = LDAP_MOD_REPLACE;
}
static void
-syncprov_add_slog( Operation *op, struct berval *csn )
+syncprov_add_slog( Operation *op )
{
opcookie *opc = op->o_callback->sc_private;
slap_overinst *on = opc->son;
{
/* Allocate a record. UUIDs are not NUL-terminated. */
se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
- csn->bv_len + 1 );
+ op->o_csn.bv_len + 1 );
se->se_next = NULL;
se->se_tag = op->o_tag;
- se->se_uuid.bv_val = (char *)(se+1);
+ se->se_uuid.bv_val = (char *)(&se[1]);
AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
se->se_uuid.bv_len = opc->suuid.bv_len;
se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len;
- AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len );
- se->se_csn.bv_val[csn->bv_len] = '\0';
- se->se_csn.bv_len = csn->bv_len;
+ AC_MEMCPY( se->se_csn.bv_val, op->o_csn.bv_val, op->o_csn.bv_len );
+ se->se_csn.bv_val[op->o_csn.bv_len] = '\0';
+ se->se_csn.bv_len = op->o_csn.bv_len;
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
if ( sl->sl_head ) {
sl->sl_mincsn.bv_len = se->se_csn.bv_len;
ch_free( se );
sl->sl_num--;
- if ( !sl->sl_head ) {
- sl->sl_tail = NULL;
- }
}
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
}
/* enter with sl->sl_mutex locked, release before returning */
static void
syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
- struct berval *oldcsn, struct berval *ctxcsn )
+ sync_control *srs, struct berval *ctxcsn )
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
slog_entry *se;
int i, j, ndel, num, nmods, mmods;
+ char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
BerVarray uuids;
+ struct berval delcsn;
if ( !sl->sl_num ) {
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
num * UUID_LEN, op->o_tmpmemctx );
-
uuids[0].bv_val = (char *)(uuids + num + 1);
+ delcsn.bv_len = 0;
+ delcsn.bv_val = cbuf;
+
/* Make a copy of the relevant UUIDs. Put the Deletes up front
* and everything else at the end. Do this first so we can
* unlock the list mutex.
*/
+ Debug( LDAP_DEBUG_SYNC, "srs csn %s\n", srs->sr_state.ctxcsn.bv_val, 0, 0 );
for ( se=sl->sl_head; se; se=se->se_next ) {
- if ( ber_bvcmp( &se->se_csn, oldcsn ) < 0 ) continue;
- if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break;
+ Debug( LDAP_DEBUG_SYNC, "log csn %s\n", se->se_csn.bv_val, 0, 0 );
+ ndel = ber_bvcmp( &se->se_csn, &srs->sr_state.ctxcsn );
+ if ( ndel <= 0 ) {
+ Debug( LDAP_DEBUG_SYNC, "cmp %d, too old\n", ndel, 0, 0 );
+ continue;
+ }
+ ndel = ber_bvcmp( &se->se_csn, ctxcsn );
+ if ( ndel > 0 ) {
+ Debug( LDAP_DEBUG_SYNC, "cmp %d, too new\n", ndel, 0, 0 );
+ break;
+ }
if ( se->se_tag == LDAP_REQ_DELETE ) {
j = i;
i++;
+ AC_MEMCPY( cbuf, se->se_csn.bv_val, se->se_csn.bv_len );
+ delcsn.bv_len = se->se_csn.bv_len;
+ delcsn.bv_val[delcsn.bv_len] = '\0';
} else {
nmods++;
j = num - nmods;
mf.f_av_value = uuids[i];
cb.sc_private = NULL;
fop.ors_slimit = 1;
+ frs.sr_nentries = 0;
rc = fop.o_bd->be_search( &fop, &frs );
/* If entry was not found, add to delete list */
fop.o_bd->bd_info = (BackendInfo *)on;
}
if ( ndel ) {
+ struct berval cookie;
+
+ slap_compose_sync_cookie( op, &cookie, &delcsn, srs->sr_state.rid );
uuids[ndel].bv_val = NULL;
- syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, uuids, 1 );
+ syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, &cookie, 0, uuids, 1 );
+ op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
}
+ op->o_tmpfree( uuids, op->o_tmpmemctx );
}
static int
if ( rs->sr_err == LDAP_SUCCESS )
{
- struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL;
+ struct berval maxcsn = BER_BVNULL;
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
+ int do_check = 0, have_psearches;
/* Update our context CSN */
cbuf[0] = '\0';
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- slap_get_commit_csn( op, &maxcsn, &curcsn );
+ ldap_pvt_thread_rdwr_wlock( &si->si_csn_rwlock );
+ slap_get_commit_csn( op, &maxcsn );
if ( !BER_BVISNULL( &maxcsn ) ) {
strcpy( cbuf, maxcsn.bv_val );
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) {
/* Don't do any processing for consumer contextCSN updates */
if ( SLAP_SYNC_SHADOW( op->o_bd ) &&
op->o_msgid == SLAP_SYNC_UPDATE_MSGID ) {
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
return SLAP_CB_CONTINUE;
}
si->si_numops++;
if ( si->si_chkops || si->si_chktime ) {
- int do_check=0;
if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
do_check = 1;
si->si_numops = 0;
do_check = 1;
si->si_chklast = op->o_time;
}
- if ( do_check ) {
- syncprov_checkpoint( op, rs, on );
- }
}
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_wunlock( &si->si_csn_rwlock );
+
+ if ( do_check ) {
+ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
+ syncprov_checkpoint( op, rs, on );
+ ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
+ }
opc->sctxcsn.bv_len = maxcsn.bv_len;
opc->sctxcsn.bv_val = cbuf;
/* Handle any persistent searches */
- if ( si->si_ops ) {
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ have_psearches = ( si->si_ops != NULL );
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+ if ( have_psearches ) {
switch(op->o_tag) {
case LDAP_REQ_ADD:
case LDAP_REQ_MODIFY:
/* Add any log records */
if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
- syncprov_add_slog( op, &curcsn );
+ syncprov_add_slog( op );
}
}
a.a_vals = bv;
a.a_nvals = a.a_vals;
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
rs->sr_err = access_allowed( op, &e, op->oq_compare.rs_ava->aa_desc,
&op->oq_compare.rs_ava->aa_value, ACL_COMPARE, NULL );
return_results:;
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
send_ldap_result( op, rs );
{
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = on->on_bi.bi_private;
+ slap_callback *cb;
+ opcookie *opc;
+ int have_psearches, cbsize;
+
+ ldap_pvt_thread_mutex_lock( &si->si_ops_mutex );
+ have_psearches = ( si->si_ops != NULL );
+ ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
+
+ cbsize = sizeof(slap_callback) + sizeof(opcookie) +
+ (have_psearches ? sizeof(modinst) : 0 );
- slap_callback *cb = op->o_tmpcalloc(1, sizeof(slap_callback)+
- sizeof(opcookie) +
- (si->si_ops ? sizeof(modinst) : 0 ),
- op->o_tmpmemctx);
- opcookie *opc = (opcookie *)(cb+1);
+ cb = op->o_tmpcalloc(1, cbsize, op->o_tmpmemctx);
+ opc = (opcookie *)(cb+1);
opc->son = on;
cb->sc_response = syncprov_op_response;
cb->sc_cleanup = syncprov_op_cleanup;
/* If there are active persistent searches, lock this operation.
* See seqmod.c for the locking logic on its own.
*/
- if ( si->si_ops ) {
+ if ( have_psearches ) {
modtarget *mt, mtdummy;
modinst *mi;
}
}
- if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
+ if (( have_psearches || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
syncprov_matchops( op, opc, 1 );
return SLAP_CB_CONTINUE;
op2->ors_filterstr.bv_val = ptr;
strcpy( ptr, so->s_filterstr.bv_val );
op2->ors_filterstr.bv_len = so->s_filterstr.bv_len;
- op2->ors_filter = str2filter( ptr );
+
+ /* Skip the AND/GE clause that we stuck on in front */
+ if ( so->s_flags & PS_FIX_FILTER ) {
+ op2->ors_filter = op->ors_filter->f_and->f_next;
+ so->s_flags ^= PS_FIX_FILTER;
+ } else {
+ op2->ors_filter = op->ors_filter;
+ }
+ op2->ors_filter = filter_dup( op2->ors_filter, NULL );
so->s_op = op2;
/* Copy any cached group ACLs individually */
op2->o_do_not_cache = 1;
/* Add op2 to conn so abandon will find us */
- ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
op->o_conn->c_n_ops_executing++;
op->o_conn->c_n_ops_completed--;
LDAP_STAILQ_INSERT_TAIL( &op->o_conn->c_ops, op2, o_next );
so->s_flags |= PS_IS_DETACHED;
- ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
+
+ /* Prevent anyone else from trying to send a result for this op */
+ op->o_abandon = 1;
}
static int
{
searchstate *ss = op->o_callback->sc_private;
slap_overinst *on = ss->ss_on;
+ syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
sync_control *srs = op->o_controls[slap_cids.sc_LDAPsync];
if ( rs->sr_type == REP_SEARCH || rs->sr_type == REP_SEARCHREF ) {
return SLAP_CB_CONTINUE;
}
a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryCSN );
+ if ( a == NULL && rs->sr_operational_attrs != NULL ) {
+ a = attr_find( rs->sr_operational_attrs, slap_schema.si_ad_entryCSN );
+ }
if ( a ) {
- /* Make sure entry is less than the snaphot'd contextCSN */
- if ( ber_bvcmp( &a->a_nvals[0], &ss->ss_ctxcsn ) > 0 )
+ /* If not a persistent search */
+ /* Make sure entry is less than the snapshot'd contextCSN */
+ if ( !ss->ss_so && ber_bvcmp( &a->a_nvals[0], &ss->ss_ctxcsn ) > 0 ) {
+ Debug( LDAP_DEBUG_SYNC, "Entry %s CSN %s greater than snapshot %s\n",
+ rs->sr_entry->e_name.bv_val,
+ a->a_nvals[0].bv_val,
+ ss->ss_ctxcsn.bv_val );
return LDAP_SUCCESS;
+ }
/* Don't send the ctx entry twice */
if ( !BER_BVISNULL( &srs->sr_state.ctxcsn ) &&
- bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) )
+ bvmatch( &a->a_nvals[0], &srs->sr_state.ctxcsn ) ) {
+ Debug( LDAP_DEBUG_SYNC, "Entry %s CSN %s matches ctx %s\n",
+ rs->sr_entry->e_name.bv_val,
+ a->a_nvals[0].bv_val,
+ srs->sr_state.ctxcsn.bv_val );
return LDAP_SUCCESS;
+ }
}
rs->sr_ctrls = op->o_tmpalloc( sizeof(LDAPControl *)*2,
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
- rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
- LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
+ /* If we're in delta-sync mode, always send a cookie */
+ if ( si->si_nopres && si->si_usehint && a ) {
+ struct berval cookie;
+ slap_compose_sync_cookie( op, &cookie, a->a_nvals, srs->sr_state.rid );
+ rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
+ LDAP_SYNC_ADD, rs->sr_ctrls, 0, 1, &cookie );
+ } else {
+ rs->sr_err = syncprov_state_ctrl( op, rs, rs->sr_entry,
+ LDAP_SYNC_ADD, rs->sr_ctrls, 0, 0, NULL );
+ }
} else if ( rs->sr_type == REP_RESULT && rs->sr_err == LDAP_SUCCESS ) {
struct berval cookie;
op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
} else {
/* It's RefreshAndPersist, transition to Persist phase */
- syncprov_sendinfo( op, rs, ( ss->ss_present && rs->sr_nentries ) ?
+ syncprov_sendinfo( op, rs, ss->ss_present ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
op->o_tmpfree( cookie.bv_val, op->o_tmpmemctx );
/* Detach this Op from frontend control */
ldap_pvt_thread_mutex_lock( &ss->ss_so->s_mutex );
+ ldap_pvt_thread_mutex_lock( &op->o_conn->c_mutex );
- /* Turn off the refreshing flag */
- ss->ss_so->s_flags ^= PS_IS_REFRESHING;
+ /* But not if this connection was closed along the way */
+ if ( op->o_abandon ) {
+ ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
+ ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
+ /* syncprov_ab_cleanup will free this syncop */
+ return SLAPD_ABANDON;
- syncprov_detach_op( op, ss->ss_so, on );
+ } else {
+ /* Turn off the refreshing flag */
+ ss->ss_so->s_flags ^= PS_IS_REFRESHING;
+
+ syncprov_detach_op( op, ss->ss_so, on );
+
+ ldap_pvt_thread_mutex_unlock( &op->o_conn->c_mutex );
+
+ /* If there are queued responses, fire them off */
+ if ( ss->ss_so->s_res )
+ syncprov_qstart( ss->ss_so );
+ }
ldap_pvt_thread_mutex_unlock( &ss->ss_so->s_mutex );
return LDAP_SUCCESS;
/* syncprov_findbase expects to be called as a callback... */
sc.sc_private = &opc;
opc.son = on;
+ ldap_pvt_thread_mutex_init( &so.s_mutex );
cb = op->o_callback;
op->o_callback = ≻
rs->sr_err = syncprov_findbase( op, &fc );
op->o_callback = cb;
+ ldap_pvt_thread_mutex_destroy( &so.s_mutex );
if ( rs->sr_err != LDAP_SUCCESS ) {
send_ldap_result( op, rs );
}
/* snapshot the ctxcsn */
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
strcpy( csnbuf, si->si_ctxcsnbuf );
ctxcsn.bv_len = si->si_ctxcsn.bv_len;
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
ctxcsn.bv_val = csnbuf;
/* If we have a cookie, handle the PRESENT lookups */
sl=si->si_logs;
if ( sl ) {
ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
- if ( ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
+ /* Are there any log entries, and is the consumer state
+ * present in the session log?
+ */
+ if ( sl->sl_num > 0 && ber_bvcmp( &srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
do_present = 0;
/* mutex is unlocked in playlog */
- syncprov_playlog( op, rs, sl, &srs->sr_state.ctxcsn, &ctxcsn );
+ syncprov_playlog( op, rs, sl, srs, &ctxcsn );
} else {
ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
}
fava->f_next = op->ors_filter;
op->ors_filter = fand;
filter2bv_x( op, op->ors_filter, &op->ors_filterstr );
+ if ( sop )
+ sop->s_flags |= PS_FIX_FILTER;
}
/* Let our callback add needed info to returned entries */
*ap = a;
}
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_rlock( &si->si_csn_rwlock );
if ( !ap ) {
strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf );
} else {
ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
}
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_runlock( &si->si_csn_rwlock );
}
}
return SLAP_CB_CONTINUE;
switch ( c->type ) {
case SP_CHKPT:
if ( lutil_atoi( &si->si_chkops, c->argv[1] ) != 0 ) {
- sprintf( c->msg, "%s unable to parse checkpoint ops # \"%s\"",
+ snprintf( c->msg, sizeof( c->msg ), "%s unable to parse checkpoint ops # \"%s\"",
c->argv[0], c->argv[1] );
- Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
+ "%s: %s\n", c->log, c->msg, 0 );
return ARG_BAD_CONF;
}
if ( si->si_chkops <= 0 ) {
- sprintf( c->msg, "%s invalid checkpoint ops # \"%d\"",
+ snprintf( c->msg, sizeof( c->msg ), "%s invalid checkpoint ops # \"%d\"",
c->argv[0], si->si_chkops );
- Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
+ "%s: %s\n", c->log, c->msg, 0 );
return ARG_BAD_CONF;
}
if ( lutil_atoi( &si->si_chktime, c->argv[2] ) != 0 ) {
- sprintf( c->msg, "%s unable to parse checkpoint time \"%s\"",
+ snprintf( c->msg, sizeof( c->msg ), "%s unable to parse checkpoint time \"%s\"",
c->argv[0], c->argv[1] );
- Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
+ "%s: %s\n", c->log, c->msg, 0 );
return ARG_BAD_CONF;
}
if ( si->si_chktime <= 0 ) {
- sprintf( c->msg, "%s invalid checkpoint time \"%d\"",
+ snprintf( c->msg, sizeof( c->msg ), "%s invalid checkpoint time \"%d\"",
c->argv[0], si->si_chkops );
- Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
+ "%s: %s\n", c->log, c->msg, 0 );
return ARG_BAD_CONF;
}
si->si_chktime *= 60;
int size = c->value_int;
if ( size < 0 ) {
- sprintf( c->msg, "%s size %d is negative",
+ snprintf( c->msg, sizeof( c->msg ), "%s size %d is negative",
c->argv[0], size );
- Debug( LDAP_DEBUG_CONFIG, "%s: %s\n", c->log, c->msg, 0 );
+ Debug( LDAP_DEBUG_CONFIG|LDAP_DEBUG_NONE,
+ "%s: %s\n", c->log, c->msg, 0 );
return ARG_BAD_CONF;
}
sl = si->si_logs;
slap_overinst *on = (slap_overinst *) be->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
- Connection conn;
- OperationBuffer opbuf;
+ Connection conn = { 0 };
+ OperationBuffer opbuf = { 0 };
char ctxcsnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
Operation *op = (Operation *) &opbuf;
- Entry *e;
+ Entry *e = NULL;
Attribute *a;
int rc;
void *thrctx = NULL;
+ if ( !SLAP_LASTMOD( be )) {
+ Debug( LDAP_DEBUG_ANY,
+ "syncprov_db_open: invalid config, lastmod must be enabled\n", 0, 0, 0 );
+ return -1;
+ }
+
if ( slapMode & SLAP_TOOL_MODE ) {
return 0;
}
ctxcsnbuf[0] = '\0';
- op->o_bd->bd_info = on->on_info->oi_orig;
- rc = be_entry_get_rw( op, be->be_nsuffix, NULL,
- slap_schema.si_ad_contextCSN, 0, &e );
+ rc = overlay_entry_get_ov( op, be->be_nsuffix, NULL,
+ slap_schema.si_ad_contextCSN, 0, &e, on );
if ( e ) {
ldap_pvt_thread_t tid;
si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
strcpy( ctxcsnbuf, si->si_ctxcsnbuf );
}
- be_entry_release_rw( op, e, 0 );
- op->o_bd->bd_info = (BackendInfo *)on;
- op->o_req_dn = be->be_suffix[0];
- op->o_req_ndn = be->be_nsuffix[0];
- op->ors_scope = LDAP_SCOPE_SUBTREE;
- ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
- ldap_pvt_thread_join( tid, NULL );
- } else if ( SLAP_SYNC_SHADOW( op->o_bd )) {
- /* If we're also a consumer, and we didn't find the context entry,
- * then don't generate anything, wait for our provider to send it
- * to us.
- */
- goto out;
+ overlay_entry_release_ov( op, e, 0, on );
+ if ( !BER_BVISEMPTY( &si->si_ctxcsn ) ) {
+ op->o_req_dn = be->be_suffix[0];
+ op->o_req_ndn = be->be_nsuffix[0];
+ op->ors_scope = LDAP_SCOPE_SUBTREE;
+ ldap_pvt_thread_create( &tid, 0, syncprov_db_otask, op );
+ ldap_pvt_thread_join( tid, NULL );
+ }
}
if ( BER_BVISEMPTY( &si->si_ctxcsn ) ) {
+ if ( SLAP_SYNC_SHADOW( op->o_bd )) {
+ /* If we're also a consumer, and we didn't get a contextCSN,
+ * then don't generate anything, wait for our provider to send it
+ * to us.
+ */
+ goto out;
+ }
si->si_ctxcsn.bv_len = sizeof( si->si_ctxcsnbuf );
slap_get_csn( op, &si->si_ctxcsn, 0 );
}
out:
op->o_bd->bd_info = (BackendInfo *)on;
- ldap_pvt_thread_pool_context_reset( thrctx );
return 0;
}
return 0;
}
if ( si->si_numops ) {
- Connection conn;
+ Connection conn = {0};
OperationBuffer opbuf;
Operation *op = (Operation *) &opbuf;
SlapReply rs = {REP_RESULT};
op->o_dn = be->be_rootdn;
op->o_ndn = be->be_rootndn;
syncprov_checkpoint( op, &rs, on );
- ldap_pvt_thread_pool_context_reset( thrctx );
}
return 0;
slap_overinst *on = (slap_overinst *)be->bd_info;
syncprov_info_t *si;
+ if ( SLAP_ISGLOBALOVERLAY( be ) ) {
+ Debug( LDAP_DEBUG_ANY,
+ "syncprov must be instantiated within a database.\n",
+ 0, 0, 0 );
+ return 1;
+ }
+
si = ch_calloc(1, sizeof(syncprov_info_t));
on->on_bi.bi_private = si;
- ldap_pvt_thread_mutex_init( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_init( &si->si_csn_rwlock );
ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
if ( si ) {
+ if ( si->si_logs ) {
+ slog_entry *se = si->si_logs->sl_head;
+
+ while ( se ) {
+ slog_entry *se_next = se->se_next;
+ ch_free( se );
+ se = se_next;
+ }
+
+ ch_free( si->si_logs );
+ }
ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
- ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
+ ldap_pvt_thread_rdwr_destroy( &si->si_csn_rwlock );
ch_free( si );
}
sr->sr_rhint = rhint;
if (!BER_BVISNULL(&cookie)) {
ber_dupbv_x( &sr->sr_state.octet_str, &cookie, op->o_tmpmemctx );
- slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx );
- if ( sr->sr_state.rid == -1 ) {
+ if ( slap_parse_sync_cookie( &sr->sr_state, op->o_tmpmemctx ) ||
+ sr->sr_state.rid == -1 ) {
rs->sr_text = "Sync control : cookie parsing error";
return LDAP_PROTOCOL_ERROR;
}
int rc;
rc = register_supported_control( LDAP_CONTROL_SYNC,
- SLAP_CTRL_HIDE|SLAP_CTRL_SEARCH, NULL,
+ SLAP_CTRL_SEARCH, NULL,
syncprov_parseCtrl, &slap_cids.sc_LDAPsync );
if ( rc != LDAP_SUCCESS ) {
Debug( LDAP_DEBUG_ANY,