From: Howard Chu Date: Sat, 27 Nov 2004 14:52:28 +0000 (+0000) Subject: Add checkpointing to save contextCSN periodically. Read contextCSN X-Git-Tag: OPENLDAP_REL_ENG_2_3_0ALPHA~198 X-Git-Url: https://git.sur5r.net/?a=commitdiff_plain;h=865f0db180001881a54526202353f26f3761e25a;p=openldap Add checkpointing to save contextCSN periodically. Read contextCSN on startup, save on shutdown. --- diff --git a/servers/slapd/overlays/syncprov.c b/servers/slapd/overlays/syncprov.c index c079eb827c..27bf28e090 100644 --- a/servers/slapd/overlays/syncprov.c +++ b/servers/slapd/overlays/syncprov.c @@ -94,7 +94,10 @@ typedef struct syncmatches { typedef struct syncprov_info_t { syncops *si_ops; struct berval si_ctxcsn; /* ldapsync context */ - int si_gotcsn; /* is our ctxcsn up to date? */ + int si_chkops; /* checkpointing info */ + int si_chktime; + int si_numops; /* number of ops since last checkpoint */ + time_t si_chklast; /* time of last checkpoint */ Avlnode *si_mods; /* entries being modified */ ldap_pvt_thread_mutex_t si_csn_mutex; ldap_pvt_thread_mutex_t si_ops_mutex; @@ -471,61 +474,26 @@ syncprov_findbase( Operation *op, fbase_cookie *fc ) } /* syncprov_findcsn: - * This function has three different purposes, but they all use a search + * This function has two different purposes, but they both use a search * that filters on entryCSN so they're combined here. - * 1: when the current contextCSN is unknown (i.e., at server start time) - * and a syncrepl search has arrived with a cookie, we search for all entries - * with CSN >= the cookie CSN, and store the maximum as our contextCSN. Also, - * we expect to find the cookie CSN in the search results, and note if we did - * or not. If not, we assume the cookie is stale. (This may be too restrictive, - * notice case 2.) - * - * 2: when the current contextCSN is known and we have a sync cookie, we search + * 1: when the current contextCSN is known and we have a sync cookie, we search * for one entry with CSN <= the cookie CSN. (Used to search for =.) If an - * entry is found, the cookie CSN is valid, otherwise it is stale. Case 1 is - * considered a special case of case 2, and both are generally called the - * "find CSN" task. + * entry is found, the cookie CSN is valid, otherwise it is stale. * - * 3: during a refresh phase, we search for all entries with CSN <= the cookie + * 2: during a refresh phase, we search for all entries with CSN <= the cookie * CSN, and generate Present records for them. We always collect this result * in SyncID sets, even if there's only one match. */ #define FIND_CSN 1 #define FIND_PRESENT 2 -typedef struct fcsn_cookie { - struct berval maxcsn; - int gotmatch; -} fcsn_cookie; - static int findcsn_cb( Operation *op, SlapReply *rs ) { slap_callback *sc = op->o_callback; if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) { - /* If the private pointer is set, it points to an fcsn_cookie - * and we want to record the maxcsn and match state. - */ - if ( sc->sc_private ) { - int i; - fcsn_cookie *fc = sc->sc_private; - sync_control *srs = op->o_controls[sync_cid]; - Attribute *a = attr_find(rs->sr_entry->e_attrs, - slap_schema.si_ad_entryCSN ); - i = ber_bvcmp( &a->a_vals[0], srs->sr_state.ctxcsn ); - if ( i == 0 ) fc->gotmatch = 1; - i = ber_bvcmp( &a->a_vals[0], &fc->maxcsn ); - if ( i > 0 ) { - fc->maxcsn.bv_len = a->a_vals[0].bv_len; - strcpy(fc->maxcsn.bv_val, a->a_vals[0].bv_val ); - } - } else { - /* Otherwise, if the private pointer is not set, we just - * want to know if any entry matched the filter. - */ - sc->sc_private = (void *)1; - } + sc->sc_private = (void *)1; } return LDAP_SUCCESS; } @@ -588,7 +556,6 @@ syncprov_findcsn( Operation *op, int mode ) Filter cf; AttributeAssertion eq; int rc; - fcsn_cookie fcookie; fpres_cookie pcookie; int locked = 0; sync_control *srs = op->o_controls[sync_cid]; @@ -602,37 +569,13 @@ syncprov_findcsn( Operation *op, int mode ) fbuf.bv_val = buf; if ( mode == FIND_CSN ) { - if ( !si->si_gotcsn ) { - /* If we don't know the current ctxcsn, find it */ - ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); - locked = 1; - } - if ( !si->si_gotcsn ) { - cf.f_choice = LDAP_FILTER_GE; - fop.ors_attrsonly = 0; - fop.ors_attrs = csn_anlist; - fop.ors_slimit = SLAP_NO_LIMIT; - cb.sc_private = &fcookie; - fcookie.maxcsn.bv_val = cbuf; - fcookie.maxcsn.bv_len = 0; - fcookie.gotmatch = 0; - fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)", srs->sr_state.ctxcsn->bv_val ); - } else { - if ( locked ) { - ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); - locked = 0; - } - cf.f_choice = LDAP_FILTER_LE; - fop.ors_attrsonly = 1; - fop.ors_attrs = slap_anlist_no_attrs; - fop.ors_slimit = 1; - cb.sc_private = NULL; - fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val ); - } + fop.ors_attrsonly = 1; + fop.ors_attrs = slap_anlist_no_attrs; + fop.ors_slimit = 1; + cb.sc_private = NULL; cb.sc_response = findcsn_cb; } else if ( mode == FIND_PRESENT ) { - cf.f_choice = LDAP_FILTER_LE; fop.ors_attrsonly = 0; fop.ors_attrs = uuid_anlist; fop.ors_slimit = SLAP_NO_LIMIT; @@ -642,12 +585,14 @@ syncprov_findcsn( Operation *op, int mode ) cb.sc_response = findpres_cb; pcookie.num = 0; pcookie.uuids = NULL; - fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val ); } + cf.f_choice = LDAP_FILTER_LE; cf.f_ava = &eq; cf.f_av_desc = slap_schema.si_ad_entryCSN; cf.f_av_value = *srs->sr_state.ctxcsn; cf.f_next = NULL; + fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", + srs->sr_state.ctxcsn->bv_val ); fop.o_callback = &cb; fop.ors_tlimit = SLAP_NO_LIMIT; @@ -659,16 +604,7 @@ syncprov_findcsn( Operation *op, int mode ) fop.o_bd->bd_info = (BackendInfo *)on; if ( mode == FIND_CSN ) { - if ( !si->si_gotcsn ) { - strcpy(si->si_ctxcsnbuf, fcookie.maxcsn.bv_val); - si->si_ctxcsn.bv_len = fcookie.maxcsn.bv_len; - si->si_gotcsn = 1; - ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); - if ( fcookie.gotmatch ) return LDAP_SUCCESS; - - } else { - if ( cb.sc_private ) return LDAP_SUCCESS; - } + if ( cb.sc_private ) return LDAP_SUCCESS; } else if ( mode == FIND_PRESENT ) { return LDAP_SUCCESS; } @@ -994,6 +930,36 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs ) op->o_tmpfree(cb, op->o_tmpmemctx); } +static void +syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on ) +{ + syncprov_info_t *si = on->on_bi.bi_private; + Modifications mod; + Operation opm; + struct berval bv[2]; + BackendInfo *orig; + slap_callback cb = {0}; + + mod.sml_values = bv; + bv[1].bv_val = NULL; + bv[0] = si->si_ctxcsn; + mod.sml_nvalues = NULL; + mod.sml_desc = slap_schema.si_ad_contextCSN; + mod.sml_op = LDAP_MOD_REPLACE; + mod.sml_next = NULL; + + cb.sc_response = slap_null_cb; + opm = *op; + opm.o_tag = LDAP_REQ_MODIFY; + opm.o_callback = &cb; + opm.orm_modlist = &mod; + opm.o_req_dn = op->o_bd->be_suffix[0]; + opm.o_req_ndn = op->o_bd->be_nsuffix[0]; + orig = opm.o_bd->bd_info; + opm.o_bd->bd_info = on->on_info->oi_orig; + opm.o_bd->be_modify( &opm, rs ); +} + static int syncprov_op_response( Operation *op, SlapReply *rs ) { @@ -1017,7 +983,23 @@ syncprov_op_response( Operation *op, SlapReply *rs ) strcpy( si->si_ctxcsnbuf, cbuf ); si->si_ctxcsn.bv_len = maxcsn.bv_len; } - si->si_gotcsn = 1; + } + + si->si_numops++; + if ( si->si_chkops || si->si_chktime ) { + int do_check=0; + if ( si->si_chkops && si->si_numops >= si->si_chkops ) { + do_check = 1; + si->si_numops = 0; + } + if ( si->si_chktime && + (op->o_time - si->si_chklast >= si->si_chktime )) { + do_check = 1; + si->si_chklast = op->o_time; + } + if ( do_check ) { + syncprov_checkpoint( op, rs, on ); + } } ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); @@ -1467,18 +1449,6 @@ syncprov_op_search( Operation *op, SlapReply *rs ) } } - /* If we didn't get a cookie and we don't know our contextcsn, try to - * find it anyway. - */ - if ( !gotstate && !si->si_gotcsn ) { - struct berval bv = BER_BVC("1"), *old; - - old = srs->sr_state.ctxcsn; - srs->sr_state.ctxcsn = &bv; - syncprov_findcsn( op, FIND_CSN ); - srs->sr_state.ctxcsn = old; - } - /* Append CSN range to search filter, save original filter * for persistent search evaluation */ @@ -1547,49 +1517,36 @@ syncprov_operational( if ( rs->sr_entry && dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) { - Attribute **ap; - - for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ; - if ( SLAP_OPATTRS( rs->sr_attr_flags ) || ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) { + Attribute *a, **ap = NULL; + - Attribute *a = ch_malloc( sizeof(Attribute)); - a->a_desc = slap_schema.si_ad_contextCSN; - a->a_vals = ch_malloc( 2 * sizeof(struct berval)); - -#if 0 /* causes a deadlock */ - if ( !si->si_gotcsn ) { - sync_control sc, *old; - void *ctrls[SLAP_MAX_CIDS]; - struct berval bv = BER_BVC("1"); - - if ( !op->o_controls ) { - memset(ctrls, 0, sizeof(ctrls)); - op->o_controls = ctrls; - } else { - old = op->o_controls[sync_cid]; - } - op->o_controls[sync_cid] = ≻ - sc.sr_state.ctxcsn = &bv; - syncprov_findcsn( op, FIND_CSN ); - if ( op->o_controls == ctrls ) { - op->o_controls = NULL; - } else { - op->o_controls[sync_cid] = old; - } + for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) { + if ( a->a_desc == slap_schema.si_ad_contextCSN ) + break; } -#endif - ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); - ber_dupbv( &a->a_vals[0], &si->si_ctxcsn ); - ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); - a->a_vals[1].bv_val = NULL; - a->a_nvals = a->a_vals; - a->a_next = NULL; - a->a_flags = 0; + if ( !a ) { + for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ); + + a = ch_malloc( sizeof(Attribute)); + a->a_desc = slap_schema.si_ad_contextCSN; + a->a_vals = ch_malloc( 2 * sizeof(struct berval)); + a->a_vals[1].bv_val = NULL; + a->a_nvals = a->a_vals; + a->a_next = NULL; + a->a_flags = 0; + *ap = a; + } - *ap = a; + ldap_pvt_thread_mutex_lock( &si->si_csn_mutex ); + if ( !ap ) { + strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf ); + } else { + ber_dupbv( &a->a_vals[0], &si->si_ctxcsn ); + } + ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex ); } } return LDAP_SUCCESS; @@ -1625,6 +1582,95 @@ syncprov_db_config( return SLAP_CONF_UNKNOWN; } +/* Cheating - we have no thread pool context for these functions, + * so make one. + */ +typedef struct thread_keys { + void *key; + void *data; + ldap_pvt_thread_pool_keyfree_t *free; +} thread_keys; + +/* A fake thread context */ +static thread_keys thrctx[8]; + +/* Read any existing contextCSN from the underlying db. + * Then search for any entries newer than that. If no value exists, + * just generate it. Cache whatever result. + */ +static int +syncprov_db_open( + BackendDB *be +) +{ + slap_overinst *on = (slap_overinst *) be->bd_info; + syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + + char opbuf[OPERATION_BUFFER_SIZE]; + Operation *op = (Operation *)opbuf; + Entry *e; + Attribute *a; + int rc; + + memset(opbuf, 0, sizeof(opbuf)); + op->o_hdr = (Opheader *)(op+1); + op->o_bd = be; + op->o_dn = be->be_rootdn; + op->o_ndn = be->be_rootndn; + op->o_threadctx = thrctx; + op->o_tmpmfuncs = &ch_mfuncs; + + op->o_bd->bd_info = on->on_info->oi_orig; + rc = be_entry_get_rw( op, be->be_nsuffix, NULL, + slap_schema.si_ad_contextCSN, 0, &e ); + + if ( e ) { + a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN ); + if ( a ) { + si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len; + if ( si->si_ctxcsn.bv_len >= sizeof(si->si_ctxcsnbuf )) + si->si_ctxcsn.bv_len = sizeof(si->si_ctxcsnbuf)-1; + strncpy( si->si_ctxcsnbuf, a->a_nvals[0].bv_val, + si->si_ctxcsn.bv_len ); + si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0'; + } + be_entry_release_r( op, e ); + } + op->o_bd->bd_info = (BackendInfo *)on; + return 0; +} + +/* Write the current contextCSN into the underlying db. + */ +static int +syncprov_db_close( + BackendDB *be +) +{ + slap_overinst *on = (slap_overinst *) be->bd_info; + syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; + int i; + + if ( si->si_numops ) { + Connection conn; + char opbuf[OPERATION_BUFFER_SIZE]; + Operation *op = (Operation *)opbuf; + SlapReply rs = {REP_RESULT}; + + connection_fake_init( &conn, op, thrctx ); + op->o_bd = be; + op->o_dn = be->be_rootdn; + op->o_ndn = be->be_rootndn; + syncprov_checkpoint( op, &rs, on ); + } + for ( i=0; thrctx[i].key; i++) { + if ( thrctx[i].free ) + thrctx[i].free( thrctx[i].key, thrctx[i].data ); + } + + return 0; +} + static int syncprov_db_init( BackendDB *be @@ -1637,6 +1683,7 @@ syncprov_db_init( on->on_bi.bi_private = si; ldap_pvt_thread_mutex_init( &si->si_csn_mutex ); ldap_pvt_thread_mutex_init( &si->si_ops_mutex ); + ldap_pvt_thread_mutex_init( &si->si_mods_mutex ); si->si_ctxcsn.bv_val = si->si_ctxcsnbuf; csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN; @@ -1657,6 +1704,7 @@ syncprov_db_destroy( syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private; if ( si ) { + ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex ); ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex ); ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex ); ch_free( si ); @@ -1790,6 +1838,8 @@ syncprov_init() syncprov.on_bi.bi_db_init = syncprov_db_init; syncprov.on_bi.bi_db_config = syncprov_db_config; syncprov.on_bi.bi_db_destroy = syncprov_db_destroy; + syncprov.on_bi.bi_db_open = syncprov_db_open; + syncprov.on_bi.bi_db_close = syncprov_db_close; syncprov.on_bi.bi_op_abandon = syncprov_op_abandon; syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;