typedef struct syncprov_info_t {
syncops *si_ops;
struct berval si_ctxcsn; /* ldapsync context */
- int si_gotcsn; /* is our ctxcsn up to date? */
+ int si_chkops; /* checkpointing info */
+ int si_chktime;
+ int si_numops; /* number of ops since last checkpoint */
+ time_t si_chklast; /* time of last checkpoint */
Avlnode *si_mods; /* entries being modified */
ldap_pvt_thread_mutex_t si_csn_mutex;
ldap_pvt_thread_mutex_t si_ops_mutex;
}
/* syncprov_findcsn:
- * This function has three different purposes, but they all use a search
+ * This function has two different purposes, but they both use a search
* that filters on entryCSN so they're combined here.
- * 1: when the current contextCSN is unknown (i.e., at server start time)
- * and a syncrepl search has arrived with a cookie, we search for all entries
- * with CSN >= the cookie CSN, and store the maximum as our contextCSN. Also,
- * we expect to find the cookie CSN in the search results, and note if we did
- * or not. If not, we assume the cookie is stale. (This may be too restrictive,
- * notice case 2.)
- *
- * 2: when the current contextCSN is known and we have a sync cookie, we search
+ * 1: when the current contextCSN is known and we have a sync cookie, we search
* for one entry with CSN <= the cookie CSN. (Used to search for =.) If an
- * entry is found, the cookie CSN is valid, otherwise it is stale. Case 1 is
- * considered a special case of case 2, and both are generally called the
- * "find CSN" task.
+ * entry is found, the cookie CSN is valid, otherwise it is stale.
*
- * 3: during a refresh phase, we search for all entries with CSN <= the cookie
+ * 2: during a refresh phase, we search for all entries with CSN <= the cookie
* CSN, and generate Present records for them. We always collect this result
* in SyncID sets, even if there's only one match.
*/
#define FIND_CSN 1
#define FIND_PRESENT 2
-typedef struct fcsn_cookie {
- struct berval maxcsn;
- int gotmatch;
-} fcsn_cookie;
-
static int
findcsn_cb( Operation *op, SlapReply *rs )
{
slap_callback *sc = op->o_callback;
if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
- /* If the private pointer is set, it points to an fcsn_cookie
- * and we want to record the maxcsn and match state.
- */
- if ( sc->sc_private ) {
- int i;
- fcsn_cookie *fc = sc->sc_private;
- sync_control *srs = op->o_controls[sync_cid];
- Attribute *a = attr_find(rs->sr_entry->e_attrs,
- slap_schema.si_ad_entryCSN );
- i = ber_bvcmp( &a->a_vals[0], srs->sr_state.ctxcsn );
- if ( i == 0 ) fc->gotmatch = 1;
- i = ber_bvcmp( &a->a_vals[0], &fc->maxcsn );
- if ( i > 0 ) {
- fc->maxcsn.bv_len = a->a_vals[0].bv_len;
- strcpy(fc->maxcsn.bv_val, a->a_vals[0].bv_val );
- }
- } else {
- /* Otherwise, if the private pointer is not set, we just
- * want to know if any entry matched the filter.
- */
- sc->sc_private = (void *)1;
- }
+ sc->sc_private = (void *)1;
}
return LDAP_SUCCESS;
}
Filter cf;
AttributeAssertion eq;
int rc;
- fcsn_cookie fcookie;
fpres_cookie pcookie;
int locked = 0;
sync_control *srs = op->o_controls[sync_cid];
fbuf.bv_val = buf;
if ( mode == FIND_CSN ) {
- if ( !si->si_gotcsn ) {
- /* If we don't know the current ctxcsn, find it */
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- locked = 1;
- }
- if ( !si->si_gotcsn ) {
- cf.f_choice = LDAP_FILTER_GE;
- fop.ors_attrsonly = 0;
- fop.ors_attrs = csn_anlist;
- fop.ors_slimit = SLAP_NO_LIMIT;
- cb.sc_private = &fcookie;
- fcookie.maxcsn.bv_val = cbuf;
- fcookie.maxcsn.bv_len = 0;
- fcookie.gotmatch = 0;
- fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)", srs->sr_state.ctxcsn->bv_val );
- } else {
- if ( locked ) {
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
- locked = 0;
- }
- cf.f_choice = LDAP_FILTER_LE;
- fop.ors_attrsonly = 1;
- fop.ors_attrs = slap_anlist_no_attrs;
- fop.ors_slimit = 1;
- cb.sc_private = NULL;
- fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
- }
+ fop.ors_attrsonly = 1;
+ fop.ors_attrs = slap_anlist_no_attrs;
+ fop.ors_slimit = 1;
+ cb.sc_private = NULL;
cb.sc_response = findcsn_cb;
} else if ( mode == FIND_PRESENT ) {
- cf.f_choice = LDAP_FILTER_LE;
fop.ors_attrsonly = 0;
fop.ors_attrs = uuid_anlist;
fop.ors_slimit = SLAP_NO_LIMIT;
cb.sc_response = findpres_cb;
pcookie.num = 0;
pcookie.uuids = NULL;
- fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
}
+ cf.f_choice = LDAP_FILTER_LE;
cf.f_ava = &eq;
cf.f_av_desc = slap_schema.si_ad_entryCSN;
cf.f_av_value = *srs->sr_state.ctxcsn;
cf.f_next = NULL;
+ fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
+ srs->sr_state.ctxcsn->bv_val );
fop.o_callback = &cb;
fop.ors_tlimit = SLAP_NO_LIMIT;
fop.o_bd->bd_info = (BackendInfo *)on;
if ( mode == FIND_CSN ) {
- if ( !si->si_gotcsn ) {
- strcpy(si->si_ctxcsnbuf, fcookie.maxcsn.bv_val);
- si->si_ctxcsn.bv_len = fcookie.maxcsn.bv_len;
- si->si_gotcsn = 1;
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
- if ( fcookie.gotmatch ) return LDAP_SUCCESS;
-
- } else {
- if ( cb.sc_private ) return LDAP_SUCCESS;
- }
+ if ( cb.sc_private ) return LDAP_SUCCESS;
} else if ( mode == FIND_PRESENT ) {
return LDAP_SUCCESS;
}
op->o_tmpfree(cb, op->o_tmpmemctx);
}
+static void
+syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
+{
+ syncprov_info_t *si = on->on_bi.bi_private;
+ Modifications mod;
+ Operation opm;
+ struct berval bv[2];
+ BackendInfo *orig;
+ slap_callback cb = {0};
+
+ mod.sml_values = bv;
+ bv[1].bv_val = NULL;
+ bv[0] = si->si_ctxcsn;
+ mod.sml_nvalues = NULL;
+ mod.sml_desc = slap_schema.si_ad_contextCSN;
+ mod.sml_op = LDAP_MOD_REPLACE;
+ mod.sml_next = NULL;
+
+ cb.sc_response = slap_null_cb;
+ opm = *op;
+ opm.o_tag = LDAP_REQ_MODIFY;
+ opm.o_callback = &cb;
+ opm.orm_modlist = &mod;
+ opm.o_req_dn = op->o_bd->be_suffix[0];
+ opm.o_req_ndn = op->o_bd->be_nsuffix[0];
+ orig = opm.o_bd->bd_info;
+ opm.o_bd->bd_info = on->on_info->oi_orig;
+ opm.o_bd->be_modify( &opm, rs );
+}
+
static int
syncprov_op_response( Operation *op, SlapReply *rs )
{
strcpy( si->si_ctxcsnbuf, cbuf );
si->si_ctxcsn.bv_len = maxcsn.bv_len;
}
- si->si_gotcsn = 1;
+ }
+
+ si->si_numops++;
+ if ( si->si_chkops || si->si_chktime ) {
+ int do_check=0;
+ if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
+ do_check = 1;
+ si->si_numops = 0;
+ }
+ if ( si->si_chktime &&
+ (op->o_time - si->si_chklast >= si->si_chktime )) {
+ do_check = 1;
+ si->si_chklast = op->o_time;
+ }
+ if ( do_check ) {
+ syncprov_checkpoint( op, rs, on );
+ }
}
ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
}
}
- /* If we didn't get a cookie and we don't know our contextcsn, try to
- * find it anyway.
- */
- if ( !gotstate && !si->si_gotcsn ) {
- struct berval bv = BER_BVC("1"), *old;
-
- old = srs->sr_state.ctxcsn;
- srs->sr_state.ctxcsn = &bv;
- syncprov_findcsn( op, FIND_CSN );
- srs->sr_state.ctxcsn = old;
- }
-
/* Append CSN range to search filter, save original filter
* for persistent search evaluation
*/
if ( rs->sr_entry &&
dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {
- Attribute **ap;
-
- for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ;
-
if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
+ Attribute *a, **ap = NULL;
+
- Attribute *a = ch_malloc( sizeof(Attribute));
- a->a_desc = slap_schema.si_ad_contextCSN;
- a->a_vals = ch_malloc( 2 * sizeof(struct berval));
-
-#if 0 /* causes a deadlock */
- if ( !si->si_gotcsn ) {
- sync_control sc, *old;
- void *ctrls[SLAP_MAX_CIDS];
- struct berval bv = BER_BVC("1");
-
- if ( !op->o_controls ) {
- memset(ctrls, 0, sizeof(ctrls));
- op->o_controls = ctrls;
- } else {
- old = op->o_controls[sync_cid];
- }
- op->o_controls[sync_cid] = ≻
- sc.sr_state.ctxcsn = &bv;
- syncprov_findcsn( op, FIND_CSN );
- if ( op->o_controls == ctrls ) {
- op->o_controls = NULL;
- } else {
- op->o_controls[sync_cid] = old;
- }
+ for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
+ if ( a->a_desc == slap_schema.si_ad_contextCSN )
+ break;
}
-#endif
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
- a->a_vals[1].bv_val = NULL;
- a->a_nvals = a->a_vals;
- a->a_next = NULL;
- a->a_flags = 0;
+ if ( !a ) {
+ for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next );
+
+ a = ch_malloc( sizeof(Attribute));
+ a->a_desc = slap_schema.si_ad_contextCSN;
+ a->a_vals = ch_malloc( 2 * sizeof(struct berval));
+ a->a_vals[1].bv_val = NULL;
+ a->a_nvals = a->a_vals;
+ a->a_next = NULL;
+ a->a_flags = 0;
+ *ap = a;
+ }
- *ap = a;
+ ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ if ( !ap ) {
+ strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf );
+ } else {
+ ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
+ }
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
}
}
return LDAP_SUCCESS;
return SLAP_CONF_UNKNOWN;
}
+/* Cheating - we have no thread pool context for these functions,
+ * so make one.
+ */
+typedef struct thread_keys {
+ void *key;
+ void *data;
+ ldap_pvt_thread_pool_keyfree_t *free;
+} thread_keys;
+
+/* A fake thread context */
+static thread_keys thrctx[8];
+
+/* Read any existing contextCSN from the underlying db.
+ * Then search for any entries newer than that. If no value exists,
+ * just generate it. Cache whatever result.
+ */
+static int
+syncprov_db_open(
+ BackendDB *be
+)
+{
+ slap_overinst *on = (slap_overinst *) be->bd_info;
+ syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+
+ char opbuf[OPERATION_BUFFER_SIZE];
+ Operation *op = (Operation *)opbuf;
+ Entry *e;
+ Attribute *a;
+ int rc;
+
+ memset(opbuf, 0, sizeof(opbuf));
+ op->o_hdr = (Opheader *)(op+1);
+ op->o_bd = be;
+ op->o_dn = be->be_rootdn;
+ op->o_ndn = be->be_rootndn;
+ op->o_threadctx = thrctx;
+ op->o_tmpmfuncs = &ch_mfuncs;
+
+ op->o_bd->bd_info = on->on_info->oi_orig;
+ rc = be_entry_get_rw( op, be->be_nsuffix, NULL,
+ slap_schema.si_ad_contextCSN, 0, &e );
+
+ if ( e ) {
+ a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
+ if ( a ) {
+ si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len;
+ if ( si->si_ctxcsn.bv_len >= sizeof(si->si_ctxcsnbuf ))
+ si->si_ctxcsn.bv_len = sizeof(si->si_ctxcsnbuf)-1;
+ strncpy( si->si_ctxcsnbuf, a->a_nvals[0].bv_val,
+ si->si_ctxcsn.bv_len );
+ si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
+ }
+ be_entry_release_r( op, e );
+ }
+ op->o_bd->bd_info = (BackendInfo *)on;
+ return 0;
+}
+
+/* Write the current contextCSN into the underlying db.
+ */
+static int
+syncprov_db_close(
+ BackendDB *be
+)
+{
+ slap_overinst *on = (slap_overinst *) be->bd_info;
+ syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+ int i;
+
+ if ( si->si_numops ) {
+ Connection conn;
+ char opbuf[OPERATION_BUFFER_SIZE];
+ Operation *op = (Operation *)opbuf;
+ SlapReply rs = {REP_RESULT};
+
+ connection_fake_init( &conn, op, thrctx );
+ op->o_bd = be;
+ op->o_dn = be->be_rootdn;
+ op->o_ndn = be->be_rootndn;
+ syncprov_checkpoint( op, &rs, on );
+ }
+ for ( i=0; thrctx[i].key; i++) {
+ if ( thrctx[i].free )
+ thrctx[i].free( thrctx[i].key, thrctx[i].data );
+ }
+
+ return 0;
+}
+
static int
syncprov_db_init(
BackendDB *be
on->on_bi.bi_private = si;
ldap_pvt_thread_mutex_init( &si->si_csn_mutex );
ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
+ ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;
csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
if ( si ) {
+ ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
ch_free( si );
syncprov.on_bi.bi_db_init = syncprov_db_init;
syncprov.on_bi.bi_db_config = syncprov_db_config;
syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
+ syncprov.on_bi.bi_db_open = syncprov_db_open;
+ syncprov.on_bi.bi_db_close = syncprov_db_close;
syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;