]> git.sur5r.net Git - openldap/commitdiff
Added sessionlog support. consumer needs work...
authorHoward Chu <hyc@openldap.org>
Wed, 8 Dec 2004 00:47:24 +0000 (00:47 +0000)
committerHoward Chu <hyc@openldap.org>
Wed, 8 Dec 2004 00:47:24 +0000 (00:47 +0000)
servers/slapd/overlays/syncprov.c
servers/slapd/slap.h

index 5847451ea2e7dc4b93dfba5d52afc8400ec9bd55..a751f477a2c5389d06e90cd3dac05c73ca79305e 100644 (file)
@@ -56,8 +56,8 @@ typedef struct syncops {
        struct berval   s_base;         /* ndn of search base */
        ID              s_eid;          /* entryID of search base */
        Operation       *s_op;          /* search op */
-       long    s_sid;
-       long    s_rid;
+       int             s_sid;
+       int             s_rid;
        struct berval s_filterstr;
        int             s_flags;        /* search status */
        int             s_inuse;        /* reference count */
@@ -91,6 +91,25 @@ typedef struct syncmatches {
        syncops *sm_op;
 } syncmatches;
 
+/* Session log data */
+typedef struct slog_entry {
+       struct slog_entry *se_next;
+       struct berval se_uuid;
+       struct berval se_csn;
+       ber_tag_t       se_tag;
+} slog_entry;
+
+typedef struct sessionlog {
+       struct sessionlog *sl_next;
+       int             sl_sid;
+       struct berval   sl_mincsn;
+       int             sl_num;
+       int             sl_size;
+       slog_entry *sl_head;
+       slog_entry *sl_tail;
+       ldap_pvt_thread_mutex_t sl_mutex;
+} sessionlog;
+
 /* The main state for this overlay */
 typedef struct syncprov_info_t {
        syncops         *si_ops;
@@ -100,6 +119,7 @@ typedef struct syncprov_info_t {
        int             si_numops;      /* number of ops since last checkpoint */
        time_t  si_chklast;     /* time of last checkpoint */
        Avlnode *si_mods;       /* entries being modified */
+       sessionlog      *si_logs;
        ldap_pvt_thread_mutex_t si_csn_mutex;
        ldap_pvt_thread_mutex_t si_ops_mutex;
        ldap_pvt_thread_mutex_t si_mods_mutex;
@@ -539,11 +559,12 @@ findpres_cb( Operation *op, SlapReply *rs )
 {
        slap_callback *sc = op->o_callback;
        fpres_cookie *pc = sc->sc_private;
+       Attribute *a;
        int ret = SLAP_CB_CONTINUE;
 
-       if ( rs->sr_type == REP_SEARCH ) {
-               Attribute *a = attr_find( rs->sr_entry->e_attrs,
-                       slap_schema.si_ad_entryUUID );
+       switch ( rs->sr_type ) {
+       case REP_SEARCH:
+               a = attr_find( rs->sr_entry->e_attrs, slap_schema.si_ad_entryUUID );
                if ( a ) {
                        pc->uuids[pc->num].bv_val = pc->last;
                        AC_MEMCPY( pc->uuids[pc->num].bv_val, a->a_nvals[0].bv_val,
@@ -553,15 +574,10 @@ findpres_cb( Operation *op, SlapReply *rs )
                        pc->uuids[pc->num].bv_val = NULL;
                }
                ret = LDAP_SUCCESS;
-               if ( pc->num == SLAP_SYNCUUID_SET_SIZE ) {
-                       ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
-                               0, pc->uuids, 0 );
-                       pc->uuids[pc->num].bv_val = pc->last;
-                       pc->num = 0;
-                       pc->last = pc->uuids[0].bv_val;
-               }
-
-       } else if ( rs->sr_type == REP_RESULT ) {
+               if ( pc->num != SLAP_SYNCUUID_SET_SIZE )
+                       break;
+               /* FALLTHRU */
+       case REP_RESULT:
                ret = rs->sr_err;
                if ( pc->num ) {
                        ret = syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL,
@@ -570,6 +586,9 @@ findpres_cb( Operation *op, SlapReply *rs )
                        pc->num = 0;
                        pc->last = pc->uuids[0].bv_val;
                }
+               break;
+       default:
+               break;
        }
        return ret;
 }
@@ -1056,6 +1075,183 @@ syncprov_checkpoint( Operation *op, SlapReply *rs, slap_overinst *on )
        opm.o_bd->be_modify( &opm, rs );
 }
 
+static void
+syncprov_add_slog( Operation *op, struct berval *csn )
+{
+       opcookie *opc = op->o_callback->sc_private;
+       slap_overinst *on = opc->son;
+       syncprov_info_t         *si = on->on_bi.bi_private;
+       sessionlog *sl;
+       slog_entry *se;
+
+       for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
+               /* Allocate a record. UUIDs are not NUL-terminated. */
+               se = ch_malloc( sizeof( slog_entry ) + opc->suuid.bv_len + 
+                       csn->bv_len + 1 );
+               se->se_next = NULL;
+               se->se_tag = op->o_tag;
+
+               se->se_uuid.bv_val = (char *)(se+1);
+               se->se_csn.bv_val = se->se_uuid.bv_val + opc->suuid.bv_len + 1;
+               AC_MEMCPY( se->se_uuid.bv_val, opc->suuid.bv_val, opc->suuid.bv_len );
+               se->se_uuid.bv_len = opc->suuid.bv_len;
+
+               AC_MEMCPY( se->se_csn.bv_val, csn->bv_val, csn->bv_len );
+               se->se_csn.bv_val[csn->bv_len] = '\0';
+               se->se_csn.bv_len = csn->bv_len;
+
+               ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+               if ( sl->sl_head ) {
+                       sl->sl_tail->se_next = se;
+               } else {
+                       sl->sl_head = se;
+               }
+               sl->sl_tail = se;
+               sl->sl_num++;
+               while ( sl->sl_num > sl->sl_size ) {
+                       se = sl->sl_head;
+                       sl->sl_head = se->se_next;
+                       strcpy( sl->sl_mincsn.bv_val, se->se_csn.bv_val );
+                       sl->sl_mincsn.bv_len = se->se_csn.bv_len;
+                       ch_free( se );
+                       sl->sl_num--;
+                       if ( !sl->sl_head ) {
+                               sl->sl_tail = NULL;
+                       }
+               }
+               ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+       }
+}
+
+/* Just set a flag if we found the matching entry */
+static int
+playlog_cb( Operation *op, SlapReply *rs )
+{
+       if ( rs->sr_type == REP_SEARCH ) {
+               op->o_callback->sc_private = (void *)1;
+       }
+       return rs->sr_err;
+}
+
+/* enter with sl->sl_mutex locked, release before returning */
+static void
+syncprov_playlog( Operation *op, SlapReply *rs, sessionlog *sl,
+       struct berval *oldcsn, struct berval *ctxcsn )
+{
+       slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
+       syncprov_info_t         *si = on->on_bi.bi_private;
+       slog_entry *se;
+       int i, j, ndel, num, nmods, mmods;
+       BerVarray uuids;
+
+       num = sl->sl_num;
+       i = 0;
+       nmods = 0;
+
+       uuids = op->o_tmpalloc( (num+1) * sizeof( struct berval ) +
+               num * UUID_LEN, op->o_tmpmemctx );
+
+       uuids[0].bv_val = (char *)(uuids + num + 1);
+
+       /* Make a copy of the relevant UUIDs. Put the Deletes up front
+        * and everything else at the end. Do this first so we can
+        * unlock the list mutex.
+        */
+       for ( se=sl->sl_head; se; se=se->se_next ) {
+               if ( ber_bvcmp( &se->se_csn, oldcsn ) < 0 ) continue;
+               if ( ber_bvcmp( &se->se_csn, ctxcsn ) > 0 ) break;
+               if ( se->se_tag == LDAP_REQ_DELETE ) {
+                       j = i;
+                       i++;
+               } else {
+                       nmods++;
+                       j = num - nmods;
+               }
+               uuids[j].bv_val = uuids[0].bv_val + (j * UUID_LEN);
+               AC_MEMCPY(uuids[j].bv_val, se->se_uuid.bv_val, UUID_LEN);
+               uuids[j].bv_len = UUID_LEN;
+       }
+       ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+
+       ndel = i;
+
+       /* Mods must be validated to see if they belong in this delete set.
+        */
+
+       mmods = nmods;
+       /* Strip any duplicates */
+       for ( i=0; i<nmods; i++ ) {
+               for ( j=0; j<ndel; j++ ) {
+                       if ( bvmatch( &uuids[j], &uuids[num - 1 - i] )) {
+                               uuids[num - 1 - i].bv_len = 0;
+                               mmods --;
+                               break;
+                       }
+               }
+               if ( uuids[num - 1 - i].bv_len == 0 ) continue;
+               for ( j=0; j<i; j++ ) {
+                       if ( bvmatch( &uuids[num - 1 - j], &uuids[num - 1 - i] )) {
+                               uuids[num - 1 - i].bv_len = 0;
+                               mmods --;
+                               break;
+                       }
+               }
+       }
+
+       if ( mmods ) {
+               Operation fop;
+               SlapReply frs = { REP_RESULT };
+               int rc;
+               Filter mf, af;
+               AttributeAssertion eq;
+               slap_callback cb = {0};
+
+               fop = *op;
+
+               fop.o_sync_mode = 0;
+               fop.o_callback = &cb;
+               fop.ors_limit = NULL;
+               fop.ors_slimit = 1;
+               fop.ors_tlimit = SLAP_NO_LIMIT;
+               fop.ors_attrs = slap_anlist_all_attributes;
+               fop.ors_attrsonly = 0;
+               fop.o_managedsait = SLAP_CONTROL_CRITICAL;
+
+               af.f_choice = LDAP_FILTER_AND;
+               af.f_next = NULL;
+               af.f_and = &mf;
+               mf.f_choice = LDAP_FILTER_EQUALITY;
+               mf.f_ava = &eq;
+               mf.f_av_desc = slap_schema.si_ad_entryUUID;
+               mf.f_next = fop.ors_filter;
+
+               fop.ors_filter = &af;
+
+               cb.sc_response = playlog_cb;
+
+               for ( i=0; i<nmods; i++ ) {
+                       if ( uuids[num - 1 - 1].bv_len == 0 ) continue;
+
+                       mf.f_av_value = uuids[num -1 -i];
+                       filter2bv_x( &fop, fop.ors_filter, &fop.ors_filterstr );
+                       fop.o_bd->bd_info = on->on_info->oi_orig;
+                       cb.sc_private = NULL;
+                       rc = fop.o_bd->be_search( &fop, &frs );
+                       fop.o_bd->bd_info = (BackendInfo *)on;
+                       op->o_tmpfree( fop.ors_filterstr.bv_val, op->o_tmpmemctx );
+
+                       /* If entry was not found, add to delete list */
+                       if ( !cb.sc_private ) {
+                               uuids[ndel++] = uuids[num - 1 - i];
+                       }
+               }
+       }
+       if ( ndel ) {
+               uuids[ndel].bv_val = NULL;
+               syncprov_sendinfo( op, rs, LDAP_TAG_SYNC_ID_SET, NULL, 0, uuids, 1 );
+       }
+}
+
 static int
 syncprov_op_response( Operation *op, SlapReply *rs )
 {
@@ -1066,13 +1262,13 @@ syncprov_op_response( Operation *op, SlapReply *rs )
 
        if ( rs->sr_err == LDAP_SUCCESS )
        {
-               struct berval maxcsn;
+               struct berval maxcsn = BER_BVNULL, curcsn = BER_BVNULL;
                char cbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
 
                /* Update our context CSN */
                cbuf[0] = '\0';
                ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
-               slap_get_commit_csn( op, &maxcsn );
+               slap_get_commit_csn( op, &maxcsn, &curcsn );
                if ( !BER_BVISNULL( &maxcsn ) ) {
                        strcpy( cbuf, maxcsn.bv_val );
                        if ( ber_bvcmp( &maxcsn, &si->si_ctxcsn ) > 0 ) {
@@ -1127,6 +1323,11 @@ syncprov_op_response( Operation *op, SlapReply *rs )
                        }
                }
 
+               /* Add any log records */
+               if ( si->si_logs && op->o_tag != LDAP_REQ_ADD ) {
+                       syncprov_add_slog( op, &curcsn );
+               }
+
        }
        return SLAP_CB_CONTINUE;
 }
@@ -1255,11 +1456,12 @@ syncprov_op_mod( Operation *op, SlapReply *rs )
                        avl_insert( &si->si_mods, mt, sp_avl_cmp, avl_dup_error );
                        ldap_pvt_thread_mutex_unlock( &si->si_mods_mutex );
                }
-
-               if ( op->o_tag != LDAP_REQ_ADD )
-                       syncprov_matchops( op, opc, 1 );
        }
 
+       if (( si->si_ops || si->si_logs ) && op->o_tag != LDAP_REQ_ADD )
+               syncprov_matchops( op, opc, 1 );
+               
+
        return SLAP_CB_CONTINUE;
 }
 
@@ -1275,6 +1477,7 @@ syncprov_op_extended( Operation *op, SlapReply *rs )
 typedef struct searchstate {
        slap_overinst *ss_on;
        syncops *ss_so;
+       int ss_present;
 } searchstate;
 
 static int
@@ -1390,11 +1593,12 @@ syncprov_search_response( Operation *op, SlapReply *rs )
                                op->o_tmpmemctx );
                        rs->sr_ctrls[1] = NULL;
                        rs->sr_err = syncprov_done_ctrl( op, rs, rs->sr_ctrls,
-                               0, 1, &cookie, LDAP_SYNC_REFRESH_PRESENTS );
+                               0, 1, &cookie, ss->ss_present ?  LDAP_SYNC_REFRESH_PRESENTS :
+                                       LDAP_SYNC_REFRESH_DELETES );
                } else {
                        int locked = 0;
                /* It's RefreshAndPersist, transition to Persist phase */
-                       syncprov_sendinfo( op, rs, rs->sr_nentries ?
+                       syncprov_sendinfo( op, rs, ss->ss_present ?
                                LDAP_TAG_SYNC_REFRESH_PRESENT : LDAP_TAG_SYNC_REFRESH_DELETE,
                                &cookie, 1, NULL, 0 );
                        /* Flush any queued persist messages */
@@ -1457,11 +1661,13 @@ syncprov_op_search( Operation *op, SlapReply *rs )
        slap_overinst           *on = (slap_overinst *)op->o_bd->bd_info;
        syncprov_info_t         *si = (syncprov_info_t *)on->on_bi.bi_private;
        slap_callback   *cb;
-       int gotstate = 0, nochange = 0;
+       int gotstate = 0, nochange = 0, do_present = 1;
        Filter *fand, *fava;
        syncops *sop = NULL;
        searchstate *ss;
        sync_control *srs;
+       struct berval ctxcsn;
+       char csnbuf[LDAP_LUTIL_CSNSTR_BUFSIZE];
 
        if ( !(op->o_sync_mode & SLAP_SYNC_REFRESH) ) return SLAP_CB_CONTINUE;
 
@@ -1508,16 +1714,24 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                ldap_pvt_thread_mutex_unlock( &si->si_ops_mutex );
        }
 
-       /* If we have a cookie, handle the PRESENT lookups
-        */
+       /* snapshot the ctxcsn */
+       ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
+       strcpy( csnbuf, si->si_ctxcsnbuf );
+       ctxcsn.bv_len = si->si_ctxcsn.bv_len;
+       ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+       ctxcsn.bv_val = csnbuf;
+       
+       /* If we have a cookie, handle the PRESENT lookups */
        if ( srs->sr_state.ctxcsn ) {
+               sessionlog *sl;
+
                /* Is the CSN in a valid format? */
                if ( srs->sr_state.ctxcsn->bv_len >= LDAP_LUTIL_CSNSTR_BUFSIZE ) {
                        send_ldap_error( op, rs, LDAP_OTHER, "invalid sync cookie" );
                        return rs->sr_err;
                }
                /* If just Refreshing and nothing has changed, shortcut it */
-               if ( bvmatch( srs->sr_state.ctxcsn, &si->si_ctxcsn )) {
+               if ( bvmatch( srs->sr_state.ctxcsn, &ctxcsn )) {
                        nochange = 1;
                        if ( !(op->o_sync_mode & SLAP_SYNC_PERSIST) ) {
                                LDAPControl     *ctrls[2];
@@ -1534,6 +1748,19 @@ syncprov_op_search( Operation *op, SlapReply *rs )
                        }
                        goto shortcut;
                }
+               /* Do we have a sessionlog for this search? */
+               for ( sl=si->si_logs; sl; sl=sl->sl_next )
+                       if ( sl->sl_sid == srs->sr_state.sid ) break;
+               if ( sl ) {
+                       ldap_pvt_thread_mutex_lock( &sl->sl_mutex );
+                       if ( ber_bvcmp( srs->sr_state.ctxcsn, &sl->sl_mincsn ) >= 0 ) {
+                               do_present = 0;
+                               /* mutex is unlocked in playlog */
+                               syncprov_playlog( op, rs, sl, srs->sr_state.ctxcsn, &ctxcsn );
+                       } else {
+                               ldap_pvt_thread_mutex_unlock( &sl->sl_mutex );
+                       }
+               }
                /* Is the CSN still present in the database? */
                if ( syncprov_findcsn( op, FIND_CSN ) != LDAP_SUCCESS ) {
                        /* No, so a reload is required */
@@ -1545,8 +1772,9 @@ syncprov_op_search( Operation *op, SlapReply *rs )
 #endif
                } else {
                        gotstate = 1;
-                       /* If context has changed, check for Present UUIDs */
-                       if ( syncprov_findcsn( op, FIND_PRESENT ) != LDAP_SUCCESS ) {
+                       /* If changed and doing Present lookup, send Present UUIDs */
+                       if ( do_present && syncprov_findcsn( op, FIND_PRESENT ) !=
+                               LDAP_SUCCESS ) {
                                send_ldap_result( op, rs );
                                return rs->sr_err;
                        }
@@ -1567,9 +1795,7 @@ syncprov_op_search( Operation *op, SlapReply *rs )
        fava->f_choice = LDAP_FILTER_LE;
        fava->f_ava = op->o_tmpalloc( sizeof(AttributeAssertion), op->o_tmpmemctx );
        fava->f_ava->aa_desc = slap_schema.si_ad_entryCSN;
-       ldap_pvt_thread_mutex_lock( &si->si_csn_mutex );
-       ber_dupbv_x( &fava->f_ava->aa_value, &si->si_ctxcsn, op->o_tmpmemctx );
-       ldap_pvt_thread_mutex_unlock( &si->si_csn_mutex );
+       ber_dupbv_x( &fava->f_ava->aa_value, &ctxcsn, op->o_tmpmemctx );
        fand->f_and = fava;
        if ( gotstate ) {
                fava->f_next = op->o_tmpalloc( sizeof(Filter), op->o_tmpmemctx );
@@ -1589,6 +1815,7 @@ shortcut:
        ss = (searchstate *)(cb+1);
        ss->ss_on = on;
        ss->ss_so = sop;
+       ss->ss_present = do_present;
        cb->sc_response = syncprov_search_response;
        cb->sc_cleanup = syncprov_search_cleanup;
        cb->sc_private = ss;
@@ -1679,6 +1906,47 @@ syncprov_db_config(
                si->si_chktime = atoi( argv[2] ) * 60;
                return 0;
 
+       } else if ( strcasecmp( argv[0], "syncprov-sessionlog" ) == 0 ) {
+               sessionlog *sl;
+               int sid, size;
+               if ( argc != 3 ) {
+                       fprintf( stderr, "%s: line %d: wrong number of arguments in "
+                               "\"syncprov-sessionlog <sid> <size>\"\n", fname, lineno );
+                       return -1;
+               }
+               sid = atoi( argv[1] );
+               if ( sid < 0 || sid > 999 ) {
+                       fprintf( stderr,
+                               "%s: line %d: session log id %d is out of range [0..999]\n",
+                               fname, lineno, sid );
+                       return -1;
+               }
+               size = atoi( argv[2] );
+               if ( size < 0 ) {
+                       fprintf( stderr,
+                               "%s: line %d: session log size %d is negative\n",
+                               fname, lineno, size );
+                       return -1;
+               }
+               for ( sl = si->si_logs; sl; sl=sl->sl_next ) {
+                       if ( sl->sl_sid == sid ) {
+                               sl->sl_size = size;
+                               break;
+                       }
+               }
+               if ( !sl ) {
+                       sl = ch_malloc( sizeof( sessionlog ) + LDAP_LUTIL_CSNSTR_BUFSIZE );
+                       sl->sl_mincsn.bv_val = (char *)(sl+1);
+                       sl->sl_mincsn.bv_len = 0;
+                       sl->sl_sid = sid;
+                       sl->sl_size = size;
+                       sl->sl_num = 0;
+                       sl->sl_head = sl->sl_tail = NULL;
+                       sl->sl_next = si->si_logs;
+                       ldap_pvt_thread_mutex_init( &sl->sl_mutex );
+                       si->si_logs = sl;
+               }
+               return 0;
        }
 
        return SLAP_CONF_UNKNOWN;
index 09a1851ee607ff44b6b92b9e23f5c834f34032da..a501942d4e64860c10e50d4431ea57229dcca2ad 100644 (file)
@@ -2065,11 +2065,12 @@ typedef struct slap_paged_state {
 #define LDAP_PSEARCH_BY_SCOPEOUT       0x05
 #define LDAP_PSEARCH_BY_PREDELETE      0x06
 
-struct psid_entry {
+struct psid_entry {            /* DELETE ME */
        struct slap_op *ps_op;
        LDAP_LIST_ENTRY(psid_entry) ps_link;
 };
 
+#if 0  /* DELETE ME */
 struct slog_entry {
        struct berval sl_uuid;
        struct berval sl_name;
@@ -2084,6 +2085,7 @@ struct slap_session_entry {
        struct berval se_spec;
        LDAP_LIST_ENTRY( slap_session_entry ) se_link;
 };
+#endif
 
 struct slap_csn_entry {
        struct berval ce_csn;