struct berval s_base; /* ndn of search base */
ID s_eid; /* entryID of search base */
Operation *s_op; /* search op */
- long s_sid;
- long s_rid;
+ int s_sid;
+ int s_rid;
struct berval s_filterstr;
int s_flags; /* search status */
int s_inuse; /* reference count */
syncops *sm_op;
} syncmatches;
+/* Session log data */
+typedef struct slog_entry {
+ struct slog_entry *se_next;
+ struct berval se_uuid;
+ struct berval se_csn;
+ ber_tag_t se_tag;
+} slog_entry;
+
+typedef struct sessionlog {
+ struct sessionlog *sl_next;
+ int sl_sid;
+ struct berval sl_mincsn;
+ int sl_num;
+ int sl_size;
+ slog_entry *sl_head;
+ slog_entry *sl_tail;
+ ldap_pvt_thread_mutex_t sl_mutex;
+} sessionlog;
+
/* The main state for this overlay */
typedef struct syncprov_info_t {
syncops *si_ops;
int si_numops; /* number of ops since last checkpoint */
time_t si_chklast; /* time of last checkpoint */
Avlnode *si_mods; /* entries being modified */
+ sessionlog *si_logs;
ldap_pvt_thread_mutex_t si_csn_mutex;
ldap_pvt_thread_mutex_t si_ops_mutex;
ldap_pvt_thread_mutex_t si_mods_mutex;
{
slap_callback *sc = op->o_callback;
fpres_cookie *pc = sc->sc_private;
+ Attribute *a;
int ret = SLAP_CB_CONTINUE;
- if ( rs->sr_type == REP_SEARCH ) {
- Attribute *a = attr_find( rs->sr_entry->e_attrs,
- slap_schema.si_ad_entryUUID );
+ switch ( rs->sr_type ) {
+ case REP_SEARCH:
+ a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
if ( a ) {
pc->uuids[pc->num].bv_val = pc->last;
AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
pc->uuids[pc->num].bv_val = NULL;
}
ret = LDAP_SUCCESS;
- if ( pc->num == SLAP_SYNCUUID_SET_SIZE ) {
- ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
- 0, pc->uuids, 0 );
- pc->uuids[pc->num].bv_val = pc->last;
- pc->num = 0;
- pc->last = pc->uuids[0].bv_val;
- }
-
- } else if ( rs->sr_type == REP_RESULT ) {
+ if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
+ break;
+ /* FALLTHRU */
+ case REP_RESULT:
ret = rs->sr_err;
if ( pc->num ) {
ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
pc->num = 0;
pc->last = pc->uuids[0].bv_val;
}
+ break;
+ default:
+ break;
}
return ret;
}
opm.o_bd->be_modify( &opm, rs );
}
+static void
+syncprov_add_slog( Operation *op, struct berval *csn )
+{
+ opcookie *opc = op->o_callback->sc_private;
+ slap_overinst *on = opc->son;
+ syncprov_info_t *si = on->on_bi.bi_private;
+ sessionlog *sl;
+ slog_entry *se;
+
+ for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
+ /* Allocate a record. UUIDs are not NUL-terminated. */
+ se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len +
+ csn->bv_len + 1 );
+ se->se_next = NULL;
+ se->se_tag = op->o_tag;
+
+ se->se_uuid.bv_val = (char *)(se+1);
+ se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len + 1;
+ AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
+ se->se_uuid.bv_len = opc->suuid.bv_len;
+
+ AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len );
+ se->se_csn.bv_val[csn->bv_len] = '\0';
+ se->se_csn.bv_len = csn->bv_len;
+
+ ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ if ( sl->sl_head ) {
+ sl->sl_tail->se_next = se;
+ } else {
+ sl->sl_head = se;
+ }
+ sl->sl_tail = se;
+ sl->sl_num++;
+ while ( sl->sl_num > sl->sl_size ) {
+ se = sl->sl_head;
+ sl->sl_head = se->se_next;
+ strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val );
+ sl->sl_mincsn.bv_len = se->se_csn.bv_len;
+ ch_free( se );
+ sl->sl_num--;
+ if ( !sl->sl_head ) {
+ sl->sl_tail = NULL;
+ }
+ }
+ ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ }
+}
+
+/* Just set a flag if we found the matching entry */
+static int
+playlog_cb( Operation *op, SlapReply *rs )
+{
+ if ( rs->sr_type == REP_SEARCH ) {
+ op->o_callback->sc_private = (void *)1;
+ }
+ return rs->sr_err;
+}
+
+/* enter with sl->sl_mutex locked, release before returning */
+static void
+syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
+ struct berval *oldcsn, struct berval *ctxcsn )
+{
+ slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
+ syncprov_info_t *si = on->on_bi.bi_private;
+ slog_entry *se;
+ int i, j, ndel, num, nmods, mmods;
+ BerVarray uuids;
+
+ num = sl->sl_num;
+ i = 0;
+ nmods = 0;
+
+ uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
+ num * UUID_LEN, op->o_tmpmemctx );
+
+ uuids[0].bv_val = (char *)(uuids + num + 1);
+
+ /* Make a copy of the relevant UUIDs. Put the Deletes up front
+ * and everything else at the end. Do this first so we can
+ * unlock the list mutex.
+ */
+ for ( se=sl->sl_head; se; se=se->se_next ) {
+ if ( ber_bvcmp( &se->se_csn, oldcsn ) < 0 ) continue;
+ if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break;
+ if ( se->se_tag == LDAP_REQ_DELETE ) {
+ j = i;
+ i++;
+ } else {
+ nmods++;
+ j = num - nmods;
+ }
+ uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
+ AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
+ uuids[j].bv_len = UUID_LEN;
+ }
+ ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+
+ ndel = i;
+
+ /* Mods must be validated to see if they belong in this delete set.
+ */
+
+ mmods = nmods;
+ /* Strip any duplicates */
+ for ( i=0; i<nmods; i++ ) {
+ for ( j=0; j<ndel; j++ ) {
+ if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
+ uuids[num - 1 - i].bv_len = 0;
+ mmods --;
+ break;
+ }
+ }
+ if ( uuids[num - 1 - i].bv_len == 0 ) continue;
+ for ( j=0; j<i; j++ ) {
+ if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
+ uuids[num - 1 - i].bv_len = 0;
+ mmods --;
+ break;
+ }
+ }
+ }
+
+ if ( mmods ) {
+ Operation fop;
+ SlapReply frs = { REP_RESULT };
+ int rc;
+ Filter mf, af;
+ AttributeAssertion eq;
+ slap_callback cb = {0};
+
+ fop = *op;
+
+ fop.o_sync_mode = 0;
+ fop.o_callback = &cb;
+ fop.ors_limit = NULL;
+ fop.ors_slimit = 1;
+ fop.ors_tlimit = SLAP_NO_LIMIT;
+ fop.ors_attrs = slap_anlist_all_attributes;
+ fop.ors_attrsonly = 0;
+ fop.o_managedsait = SLAP_CONTROL_CRITICAL;
+
+ af.f_choice = LDAP_FILTER_AND;
+ af.f_next = NULL;
+ af.f_and = &mf;
+ mf.f_choice = LDAP_FILTER_EQUALITY;
+ mf.f_ava = &eq;
+ mf.f_av_desc = slap_schema.si_ad_entryUUID;
+ mf.f_next = fop.ors_filter;
+
+ fop.ors_filter = ⁡
+
+ cb.sc_response = playlog_cb;
+
+ for ( i=0; i<nmods; i++ ) {
+ if ( uuids[num - 1 - 1].bv_len == 0 ) continue;
+
+ mf.f_av_value = uuids[num -1 -i];
+ filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr );
+ fop.o_bd->bd_info = on->on_info->oi_orig;
+ cb.sc_private = NULL;
+ rc = fop.o_bd->be_search( &fop, &frs );
+ fop.o_bd->bd_info = (BackendInfo *)on;
+ op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx );
+
+ /* If entry was not found, add to delete list */
+ if ( !cb.sc_private ) {
+ uuids[ndel++] = uuids[num - 1 - i];
+ }
+ }
+ }
+ if ( ndel ) {
+ uuids[ndel].bv_val = NULL;
+ syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, uuids, 1 );
+ }
+}
+
static int
syncprov_op_response( Operation *op, SlapReply *rs )
{
if ( rs->sr_err == LDAP_SUCCESS )
{
- struct berval maxcsn;
+ struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL;
char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
/* Update our context CSN */
cbuf[0] = '\0';
ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- slap_get_commit_csn( op, &maxcsn );
+ slap_get_commit_csn( op, &maxcsn, &curcsn );
if ( !BER_BVISNULL( &maxcsn ) ) {
strcpy( cbuf, maxcsn.bv_val );
if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) {
}
}
+ /* Add any log records */
+ if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
+ syncprov_add_slog( op, &curcsn );
+ }
+
}
return SLAP_CB_CONTINUE;
}
avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
}
-
- if ( op->o_tag != LDAP_REQ_ADD )
- syncprov_matchops( op, opc, 1 );
}
+ if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
+ syncprov_matchops( op, opc, 1 );
+
+
return SLAP_CB_CONTINUE;
}
typedef struct searchstate {
slap_overinst *ss_on;
syncops *ss_so;
+ int ss_present;
} searchstate;
static int
op->o_tmpmemctx );
rs->sr_ctrls[1] = NULL;
rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
- 0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS );
+ 0, 1, &cookie, ss->ss_present ? LDAP_SYNC_REFRESH_PRESENTS :
+ LDAP_SYNC_REFRESH_DELETES );
} else {
int locked = 0;
/* It's RefreshAndPersist, transition to Persist phase */
- syncprov_sendinfo( op, rs, rs->sr_nentries ?
+ syncprov_sendinfo( op, rs, ss->ss_present ?
LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
&cookie, 1, NULL, 0 );
/* Flush any queued persist messages */
slap_overinst *on = (slap_overinst *)op->o_bd->bd_info;
syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
slap_callback *cb;
- int gotstate = 0, nochange = 0;
+ int gotstate = 0, nochange = 0, do_present = 1;
Filter *fand, *fava;
syncops *sop = NULL;
searchstate *ss;
sync_control *srs;
+ struct berval ctxcsn;
+ char csnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
}
- /* If we have a cookie, handle the PRESENT lookups
- */
+ /* snapshot the ctxcsn */
+ ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+ strcpy( csnbuf, si->si_ctxcsnbuf );
+ ctxcsn.bv_len = si->si_ctxcsn.bv_len;
+ ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ctxcsn.bv_val = csnbuf;
+
+ /* If we have a cookie, handle the PRESENT lookups */
if ( srs->sr_state.ctxcsn ) {
+ sessionlog *sl;
+
/* Is the CSN in a valid format? */
if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) {
send_ldap_error( op, rs, LDAP_OTHER, "invalid sync cookie" );
return rs->sr_err;
}
/* If just Refreshing and nothing has changed, shortcut it */
- if ( bvmatch( srs->sr_state.ctxcsn, &si->si_ctxcsn )) {
+ if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) {
nochange = 1;
if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
LDAPControl *ctrls[2];
}
goto shortcut;
}
+ /* Do we have a sessionlog for this search? */
+ for ( sl=si->si_logs; sl; sl=sl->sl_next )
+ if ( sl->sl_sid == srs->sr_state.sid ) break;
+ if ( sl ) {
+ ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+ if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
+ do_present = 0;
+ /* mutex is unlocked in playlog */
+ syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn );
+ } else {
+ ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+ }
+ }
/* Is the CSN still present in the database? */
if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
/* No, so a reload is required */
#endif
} else {
gotstate = 1;
- /* If context has changed, check for Present UUIDs */
- if ( syncprov_findcsn( op, FIND_PRESENT ) != LDAP_SUCCESS ) {
+ /* If changed and doing Present lookup, send Present UUIDs */
+ if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) !=
+ LDAP_SUCCESS ) {
send_ldap_result( op, rs );
return rs->sr_err;
}
fava->f_choice = LDAP_FILTER_LE;
fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
- ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
- ber_dupbv_x( &fava->f_ava->aa_value, &si->si_ctxcsn, op->o_tmpmemctx );
- ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+ ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx );
fand->f_and = fava;
if ( gotstate ) {
fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
ss = (searchstate *)(cb+1);
ss->ss_on = on;
ss->ss_so = sop;
+ ss->ss_present = do_present;
cb->sc_response = syncprov_search_response;
cb->sc_cleanup = syncprov_search_cleanup;
cb->sc_private = ss;
si->si_chktime = atoi( argv[2] ) * 60;
return 0;
+ } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) {
+ sessionlog *sl;
+ int sid, size;
+ if ( argc != 3 ) {
+ fprintf( stderr, "%s: line %d: wrong number of arguments in "
+ "\"syncprov-sessionlog <sid> <size>\"\n", fname, lineno );
+ return -1;
+ }
+ sid = atoi( argv[1] );
+ if ( sid < 0 || sid > 999 ) {
+ fprintf( stderr,
+ "%s: line %d: session log id %d is out of range [0..999]\n",
+ fname, lineno, sid );
+ return -1;
+ }
+ size = atoi( argv[2] );
+ if ( size < 0 ) {
+ fprintf( stderr,
+ "%s: line %d: session log size %d is negative\n",
+ fname, lineno, size );
+ return -1;
+ }
+ for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
+ if ( sl->sl_sid == sid ) {
+ sl->sl_size = size;
+ break;
+ }
+ }
+ if ( !sl ) {
+ sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
+ sl->sl_mincsn.bv_val = (char *)(sl+1);
+ sl->sl_mincsn.bv_len = 0;
+ sl->sl_sid = sid;
+ sl->sl_size = size;
+ sl->sl_num = 0;
+ sl->sl_head = sl->sl_tail = NULL;
+ sl->sl_next = si->si_logs;
+ ldap_pvt_thread_mutex_init( &sl->sl_mutex );
+ si->si_logs = sl;
+ }
+ return 0;
}
return SLAP_CONF_UNKNOWN;