]> git.sur5r.net Git - openldap/commitdiff
Add checkpointing to save contextCSN periodically. Read contextCSN
authorHoward Chu <hyc@openldap.org>
Sat, 27 Nov 2004 14:52:28 +0000 (14:52 +0000)
committerHoward Chu <hyc@openldap.org>
Sat, 27 Nov 2004 14:52:28 +0000 (14:52 +0000)
on startup, save on shutdown.

servers/slapd/overlays/syncprov.c

index c079eb827c244a2ae9c97a853ec0f27308f0c8d0..27bf28e090082e5f88591a2a4a88ff9aad6a4cf8 100644 (file)
@@ -94,7 +94,10 @@ typedef struct syncmatches {
 typedef struct syncprov_info_t {
        syncops         *si_ops;
        struct berval   si_ctxcsn;      /* ldapsync context */
-       int             si_gotcsn;      /* is our ctxcsn up to date? */
+       int             si_chkops;      /* checkpointing info */
+       int             si_chktime;
+       int             si_numops;      /* number of ops since last checkpoint */
+       time_t  si_chklast;     /* time of last checkpoint */
        Avlnode *si_mods;       /* entries being modified */
        ldap_pvt_thread_mutex_t si_csn_mutex;
        ldap_pvt_thread_mutex_t si_ops_mutex;
@@ -471,61 +474,26 @@ syncprov_findbase( Operation *op, fbase_cookie *fc )
 }
 
 /* syncprov_findcsn:
- *   This function has three different purposes, but they all use a search
+ *   This function has two different purposes, but they both use a search
  * that filters on entryCSN so they're combined here.
- * 1: when the current contextCSN is unknown (i.e., at server start time)
- * and a syncrepl search has arrived with a cookie, we search for all entries
- * with CSN >= the cookie CSN, and store the maximum as our contextCSN. Also,
- * we expect to find the cookie CSN in the search results, and note if we did
- * or not. If not, we assume the cookie is stale. (This may be too restrictive,
- * notice case 2.)
- *
- * 2: when the current contextCSN is known and we have a sync cookie, we search
+ * 1: when the current contextCSN is known and we have a sync cookie, we search
  * for one entry with CSN <= the cookie CSN. (Used to search for =.) If an
- * entry is found, the cookie CSN is valid, otherwise it is stale. Case 1 is
- * considered a special case of case 2, and both are generally called the
- * "find CSN" task.
+ * entry is found, the cookie CSN is valid, otherwise it is stale.
  *
- * 3: during a refresh phase, we search for all entries with CSN <= the cookie
+ * 2: during a refresh phase, we search for all entries with CSN <= the cookie
  * CSN, and generate Present records for them. We always collect this result
  * in SyncID sets, even if there's only one match.
  */
 #define        FIND_CSN        1
 #define        FIND_PRESENT    2
 
-typedef struct fcsn_cookie {
-       struct berval maxcsn;
-       int gotmatch;
-} fcsn_cookie;
-
 static int
 findcsn_cb( Operation *op, SlapReply *rs )
 {
        slap_callback *sc = op->o_callback;
 
        if ( rs->sr_type == REP_SEARCH && rs->sr_err == LDAP_SUCCESS ) {
-               /* If the private pointer is set, it points to an fcsn_cookie
-                * and we want to record the maxcsn and match state.
-                */
-               if ( sc->sc_private ) {
-                       int i;
-                       fcsn_cookie *fc = sc->sc_private;
-                       sync_control *srs = op->o_controls[sync_cid];
-                       Attribute *a = attr_find(rs->sr_entry->e_attrs,
-                               slap_schema.si_ad_entryCSN );
-                       i = ber_bvcmp( &a->a_vals[0], srs->sr_state.ctxcsn );
-                       if ( i == 0 ) fc->gotmatch = 1;
-                       i = ber_bvcmp( &a->a_vals[0], &fc->maxcsn );
-                       if ( i > 0 ) {
-                               fc->maxcsn.bv_len = a->a_vals[0].bv_len;
-                               strcpy(fc->maxcsn.bv_val, a->a_vals[0].bv_val );
-                       }
-               } else {
-               /* Otherwise, if the private pointer is not set, we just
-                * want to know if any entry matched the filter.
-                */
-                       sc->sc_private = (void *)1;
-               }
+               sc->sc_private = (void *)1;
        }
        return LDAP_SUCCESS;
 }
@@ -588,7 +556,6 @@ syncprov_findcsn( Operation *op, int mode )
        Filter cf;
        AttributeAssertion eq;
        int rc;
-       fcsn_cookie fcookie;
        fpres_cookie pcookie;
        int locked = 0;
        sync_control *srs = op->o_controls[sync_cid];
@@ -602,37 +569,13 @@ syncprov_findcsn( Operation *op, int mode )
 
        fbuf.bv_val = buf;
        if ( mode == FIND_CSN ) {
-               if ( !si->si_gotcsn ) {
-                       /* If we don't know the current ctxcsn, find it */
-                       ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
-                       locked = 1;
-               }
-               if ( !si->si_gotcsn ) {
-                       cf.f_choice = LDAP_FILTER_GE;
-                       fop.ors_attrsonly = 0;
-                       fop.ors_attrs = csn_anlist;
-                       fop.ors_slimit = SLAP_NO_LIMIT;
-                       cb.sc_private = &fcookie;
-                       fcookie.maxcsn.bv_val = cbuf;
-                       fcookie.maxcsn.bv_len = 0;
-                       fcookie.gotmatch = 0;
-                       fbuf.bv_len = sprintf( buf, "(entryCSN>=%s)", srs->sr_state.ctxcsn->bv_val );
-               } else {
-                       if ( locked ) {
-                               ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
-                               locked = 0;
-                       }
-                       cf.f_choice = LDAP_FILTER_LE;
-                       fop.ors_attrsonly = 1;
-                       fop.ors_attrs = slap_anlist_no_attrs;
-                       fop.ors_slimit = 1;
-                       cb.sc_private = NULL;
-                       fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
-               }
+               fop.ors_attrsonly = 1;
+               fop.ors_attrs = slap_anlist_no_attrs;
+               fop.ors_slimit = 1;
+               cb.sc_private = NULL;
                cb.sc_response = findcsn_cb;
 
        } else if ( mode == FIND_PRESENT ) {
-               cf.f_choice = LDAP_FILTER_LE;
                fop.ors_attrsonly = 0;
                fop.ors_attrs = uuid_anlist;
                fop.ors_slimit = SLAP_NO_LIMIT;
@@ -642,12 +585,14 @@ syncprov_findcsn( Operation *op, int mode )
                cb.sc_response = findpres_cb;
                pcookie.num = 0;
                pcookie.uuids = NULL;
-               fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)", srs->sr_state.ctxcsn->bv_val );
        }
+       cf.f_choice = LDAP_FILTER_LE;
        cf.f_ava = &eq;
        cf.f_av_desc = slap_schema.si_ad_entryCSN;
        cf.f_av_value = *srs->sr_state.ctxcsn;
        cf.f_next = NULL;
+       fbuf.bv_len = sprintf( buf, "(entryCSN<=%s)",
+               srs->sr_state.ctxcsn->bv_val );
 
        fop.o_callback = &cb;
        fop.ors_tlimit = SLAP_NO_LIMIT;
@@ -659,16 +604,7 @@ syncprov_findcsn( Operation *op, int mode )
        fop.o_bd->bd_info = (BackendInfo *)on;
 
        if ( mode == FIND_CSN ) {
-               if ( !si->si_gotcsn ) {
-                       strcpy(si->si_ctxcsnbuf, fcookie.maxcsn.bv_val);
-                       si->si_ctxcsn.bv_len = fcookie.maxcsn.bv_len;
-                       si->si_gotcsn = 1;
-                       ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
-                       if ( fcookie.gotmatch ) return LDAP_SUCCESS;
-                       
-               } else {
-                       if ( cb.sc_private ) return LDAP_SUCCESS;
-               }
+               if ( cb.sc_private ) return LDAP_SUCCESS;
        } else if ( mode == FIND_PRESENT ) {
                return LDAP_SUCCESS;
        }
@@ -994,6 +930,36 @@ syncprov_op_cleanup( Operation *op, SlapReply *rs )
        op->o_tmpfree(cb, op->o_tmpmemctx);
 }
 
+static void
+syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
+{
+       syncprov_info_t         *si = on->on_bi.bi_private;
+       Modifications mod;
+       Operation opm;
+       struct berval bv[2];
+       BackendInfo *orig;
+       slap_callback cb = {0};
+
+       mod.sml_values = bv;
+       bv[1].bv_val = NULL;
+       bv[0] = si->si_ctxcsn;
+       mod.sml_nvalues = NULL;
+       mod.sml_desc = slap_schema.si_ad_contextCSN;
+       mod.sml_op = LDAP_MOD_REPLACE;
+       mod.sml_next = NULL;
+
+       cb.sc_response = slap_null_cb;
+       opm = *op;
+       opm.o_tag = LDAP_REQ_MODIFY;
+       opm.o_callback = &cb;
+       opm.orm_modlist = &mod;
+       opm.o_req_dn = op->o_bd->be_suffix[0];
+       opm.o_req_ndn = op->o_bd->be_nsuffix[0];
+       orig = opm.o_bd->bd_info;
+       opm.o_bd->bd_info = on->on_info->oi_orig;
+       opm.o_bd->be_modify( &opm, rs );
+}
+
 static int
 syncprov_op_response( Operation *op, SlapReply *rs )
 {
@@ -1017,7 +983,23 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                                strcpy( si->si_ctxcsnbuf, cbuf );
                                si->si_ctxcsn.bv_len = maxcsn.bv_len;
                        }
-                       si->si_gotcsn = 1;
+               }
+
+               si->si_numops++;
+               if ( si->si_chkops || si->si_chktime ) {
+                       int do_check=0;
+                       if ( si->si_chkops && si->si_numops >= si->si_chkops ) {
+                               do_check = 1;
+                               si->si_numops = 0;
+                       }
+                       if ( si->si_chktime && 
+                               (op->o_time - si->si_chklast >= si->si_chktime )) {
+                               do_check = 1;
+                               si->si_chklast = op->o_time;
+                       }
+                       if ( do_check ) {
+                               syncprov_checkpoint( op, rs, on );
+                       }
                }
                ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
 
@@ -1467,18 +1449,6 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                }
        }
 
-       /* If we didn't get a cookie and we don't know our contextcsn, try to
-        * find it anyway.
-        */
-       if ( !gotstate && !si->si_gotcsn ) {
-               struct berval bv = BER_BVC("1"), *old;
-               
-               old = srs->sr_state.ctxcsn;
-               srs->sr_state.ctxcsn = &bv;
-               syncprov_findcsn( op, FIND_CSN );
-               srs->sr_state.ctxcsn = old;
-       }
-
        /* Append CSN range to search filter, save original filter
         * for persistent search evaluation
         */
@@ -1547,49 +1517,36 @@ syncprov_operational(
        if ( rs->sr_entry &&
                dn_match( &rs->sr_entry->e_nname, op->o_bd->be_nsuffix )) {
 
-               Attribute **ap;
-
-               for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next ) ;
-
                if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
                        ad_inlist( slap_schema.si_ad_contextCSN, rs->sr_attrs )) {
+                       Attribute *a, **ap = NULL;
+
                        
-                       Attribute *a = ch_malloc( sizeof(Attribute));
-                       a->a_desc = slap_schema.si_ad_contextCSN;
-                       a->a_vals = ch_malloc( 2 * sizeof(struct berval));
-
-#if 0  /* causes a deadlock */
-                       if ( !si->si_gotcsn ) {
-                               sync_control sc, *old;
-                               void *ctrls[SLAP_MAX_CIDS];
-                               struct berval bv = BER_BVC("1");
-               
-                               if ( !op->o_controls ) {
-                                       memset(ctrls, 0, sizeof(ctrls));
-                                       op->o_controls = ctrls;
-                               } else {
-                                       old = op->o_controls[sync_cid];
-                               }
-                               op->o_controls[sync_cid] = &sc;
-                               sc.sr_state.ctxcsn = &bv;
-                               syncprov_findcsn( op, FIND_CSN );
-                               if ( op->o_controls == ctrls ) {
-                                       op->o_controls = NULL;
-                               } else {
-                                       op->o_controls[sync_cid] = old;
-                               }
+                       for ( a=rs->sr_entry->e_attrs; a; a=a->a_next ) {
+                               if ( a->a_desc == slap_schema.si_ad_contextCSN )
+                                       break;
                        }
-#endif
-                       ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
-                       ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
-                       ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
 
-                       a->a_vals[1].bv_val = NULL;
-                       a->a_nvals = a->a_vals;
-                       a->a_next = NULL;
-                       a->a_flags = 0;
+                       if ( !a ) {
+                               for ( ap = &rs->sr_operational_attrs; *ap; ap=&(*ap)->a_next );
+
+                               a = ch_malloc( sizeof(Attribute));
+                               a->a_desc = slap_schema.si_ad_contextCSN;
+                               a->a_vals = ch_malloc( 2 * sizeof(struct berval));
+                               a->a_vals[1].bv_val = NULL;
+                               a->a_nvals = a->a_vals;
+                               a->a_next = NULL;
+                               a->a_flags = 0;
+                               *ap = a;
+                       }
 
-                       *ap = a;
+                       ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+                       if ( !ap ) {
+                               strcpy( a->a_vals[0].bv_val, si->si_ctxcsnbuf );
+                       } else {
+                               ber_dupbv( &a->a_vals[0], &si->si_ctxcsn );
+                       }
+                       ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
                }
        }
        return LDAP_SUCCESS;
@@ -1625,6 +1582,95 @@ syncprov_db_config(
        return SLAP_CONF_UNKNOWN;
 }
 
+/* Cheating - we have no thread pool context for these functions,
+ * so make one.
+ */
+typedef struct thread_keys {
+       void *key;
+       void *data;
+       ldap_pvt_thread_pool_keyfree_t *free;
+} thread_keys;
+
+/* A fake thread context */
+static thread_keys thrctx[8];
+
+/* Read any existing contextCSN from the underlying db.
+ * Then search for any entries newer than that. If no value exists,
+ * just generate it. Cache whatever result.
+ */
+static int
+syncprov_db_open(
+    BackendDB *be
+)
+{
+    slap_overinst   *on = (slap_overinst *) be->bd_info;
+    syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+
+       char opbuf[OPERATION_BUFFER_SIZE];
+       Operation *op = (Operation *)opbuf;
+       Entry *e;
+       Attribute *a;
+       int rc;
+
+       memset(opbuf, 0, sizeof(opbuf));
+       op->o_hdr = (Opheader *)(op+1);
+       op->o_bd = be;
+       op->o_dn = be->be_rootdn;
+       op->o_ndn = be->be_rootndn;
+       op->o_threadctx = thrctx;
+       op->o_tmpmfuncs = &ch_mfuncs;
+
+       op->o_bd->bd_info = on->on_info->oi_orig;
+       rc = be_entry_get_rw( op, be->be_nsuffix, NULL,
+               slap_schema.si_ad_contextCSN, 0, &e );
+
+       if ( e ) {
+               a = attr_find( e->e_attrs, slap_schema.si_ad_contextCSN );
+               if ( a ) {
+                       si->si_ctxcsn.bv_len = a->a_nvals[0].bv_len;
+                       if ( si->si_ctxcsn.bv_len >= sizeof(si->si_ctxcsnbuf ))
+                               si->si_ctxcsn.bv_len = sizeof(si->si_ctxcsnbuf)-1;
+                       strncpy( si->si_ctxcsnbuf, a->a_nvals[0].bv_val,
+                               si->si_ctxcsn.bv_len );
+                       si->si_ctxcsnbuf[si->si_ctxcsn.bv_len] = '\0';
+               }
+               be_entry_release_r( op, e );
+       }
+       op->o_bd->bd_info = (BackendInfo *)on;
+    return 0;
+}
+
+/* Write the current contextCSN into the underlying db.
+ */
+static int
+syncprov_db_close(
+    BackendDB *be
+)
+{
+    slap_overinst   *on = (slap_overinst *) be->bd_info;
+    syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
+       int i;
+
+       if ( si->si_numops ) {
+               Connection conn;
+               char opbuf[OPERATION_BUFFER_SIZE];
+               Operation *op = (Operation *)opbuf;
+               SlapReply rs = {REP_RESULT};
+
+               connection_fake_init( &conn, op, thrctx );
+               op->o_bd = be;
+               op->o_dn = be->be_rootdn;
+               op->o_ndn = be->be_rootndn;
+               syncprov_checkpoint( op, &rs, on );
+       }
+       for ( i=0; thrctx[i].key; i++) {
+               if ( thrctx[i].free )
+                       thrctx[i].free( thrctx[i].key, thrctx[i].data );
+       }
+
+    return 0;
+}
+
 static int
 syncprov_db_init(
        BackendDB *be
@@ -1637,6 +1683,7 @@ syncprov_db_init(
        on->on_bi.bi_private = si;
        ldap_pvt_thread_mutex_init( &si->si_csn_mutex );
        ldap_pvt_thread_mutex_init( &si->si_ops_mutex );
+       ldap_pvt_thread_mutex_init( &si->si_mods_mutex );
        si->si_ctxcsn.bv_val = si->si_ctxcsnbuf;
 
        csn_anlist[0].an_desc = slap_schema.si_ad_entryCSN;
@@ -1657,6 +1704,7 @@ syncprov_db_destroy(
        syncprov_info_t *si = (syncprov_info_t *)on->on_bi.bi_private;
 
        if ( si ) {
+               ldap_pvt_thread_mutex_destroy( &si->si_mods_mutex );
                ldap_pvt_thread_mutex_destroy( &si->si_ops_mutex );
                ldap_pvt_thread_mutex_destroy( &si->si_csn_mutex );
                ch_free( si );
@@ -1790,6 +1838,8 @@ syncprov_init()
        syncprov.on_bi.bi_db_init = syncprov_db_init;
        syncprov.on_bi.bi_db_config = syncprov_db_config;
        syncprov.on_bi.bi_db_destroy = syncprov_db_destroy;
+       syncprov.on_bi.bi_db_open = syncprov_db_open;
+       syncprov.on_bi.bi_db_close = syncprov_db_close;
 
        syncprov.on_bi.bi_op_abandon = syncprov_op_abandon;
        syncprov.on_bi.bi_op_cancel = syncprov_op_abandon;