]> git.sur5r.net Git - openldap/commitdiff
Use runqueue for consistency checker
authorHoward Chu <hyc@openldap.org>
Sat, 6 Dec 2003 23:32:56 +0000 (23:32 +0000)
committerHoward Chu <hyc@openldap.org>
Sat, 6 Dec 2003 23:32:56 +0000 (23:32 +0000)
servers/slapd/overlays/pcache.c

index 1f27391e47d073518b0bd0aec8ea09848b011689..bda597b379594da9f0eb90f5419b3082764dd01c 100644 (file)
@@ -33,6 +33,7 @@
 #include "slap.h"
 #include "ldap_pvt.h"
 #include "lutil.h"
+#include "ldap_rq.h"
 
 /* query cache structs */
 /* query */
@@ -120,8 +121,8 @@ typedef struct cache_manager_s {
         int     num_entries_limit;             /* max # of entries in a cacheable query */ 
 
        int     cc_period;              /* interval between successive consistency checks (sec) */ 
-       int     cc_thread_started; 
-       ldap_pvt_thread_t   cc_thread; 
+       int     cc_paused;
+       void    *cc_arg;
 
        ldap_pvt_thread_mutex_t         cache_mutex;            
        ldap_pvt_thread_mutex_t         remove_mutex;
@@ -136,6 +137,7 @@ static char *queryid_schema = "( 1.3.6.1.4.1.4203.666.1.12 NAME 'queryid' "
                        "SYNTAX 1.3.6.1.4.1.1466.115.121.1.40{64} "
                        "NO-USER-MODIFICATION USAGE directoryOperation )";
 
+/* Return 1 for an added entry, else 0 */
 static int
 merge_entry(
        Operation               *op,
@@ -179,19 +181,24 @@ merge_entry(
                        modlist->sml_op = LDAP_MOD_ADD;
                        op->o_tag = LDAP_REQ_MODIFY;
                        op->orm_modlist = modlist;
-                       rc = op->o_bd->be_modify( op, &sreply );
+                       op->o_bd->be_modify( op, &sreply );
                        slap_mods_free( modlist );
                } else if ( rc == LDAP_REFERRAL ||
                                        rc == LDAP_NO_SUCH_OBJECT ) {
                        syncrepl_add_glue( op, e );
                        e = NULL;
+                       rc = 1;
+               }
+               if ( e ) {
+                       entry_free( e );
+                       rc = 0;
                }
-               if ( e ) entry_free( e );
        } else {
                be_entry_release_w( op, e );
+               rc = 1;
        }
 
-       return 0;
+       return rc;
 }
 
 /* compare base and scope of incoming and cached queries */
@@ -960,11 +967,6 @@ is_temp_answerable(
                strcasecmp(qt->querystr.bv_val, tempstr->bv_val) == 0);
 }
 
-static void*
-consistency_check(
-       void    *op
-); 
-
 static int 
 filter2template(
        Filter                  *f,
@@ -1161,41 +1163,19 @@ cache_entries(
                }
 
                return_val = merge_entry(&op_tmp, e, query_uuid);
-#ifdef NEW_LOGGING
-               LDAP_LOG( BACK_META, DETAIL1,
-                       "ENTRY ADDED/MERGED, CACHE =%d entries\n",
-                       cm->cur_entries, 0, 0 );
-#else /* !NEW_LOGGING */
-               Debug( LDAP_DEBUG_ANY,
-                       "ENTRY ADDED/MERGED, CACHE =%d entries\n",
-                       cm->cur_entries, 0, 0 );
-#endif /* !NEW_LOGGING */
-#ifdef NEW_LOGGING
-               LDAP_LOG( BACK_META, DETAIL2, "UNLOCKING REMOVE MUTEX\n",
-                               0, 0, 0 );
-#else /* !NEW_LOGGING */
-               Debug( LDAP_DEBUG_NONE, "UNLOCKING REMOVE MUTEX\n", 0, 0, 0 );
-#endif /* !NEW_LOGGING */
                ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); 
-#ifdef NEW_LOGGING
-               LDAP_LOG( BACK_META, DETAIL2, "UNLOCKED REMOVE MUTEX\n",
-                               0, 0, 0 );
-#else /* !NEW_LOGGING */
-               Debug( LDAP_DEBUG_NONE, "UNLOCKED REMOVE MUTEX\n", 0, 0, 0 );
-#endif /* !NEW_LOGGING */
-               if (return_val)
-                       return 0; 
                ldap_pvt_thread_mutex_lock(&cm->cache_mutex); 
-               cm->cur_entries++;
+               cm->cur_entries += return_val;
 #ifdef NEW_LOGGING
                LDAP_LOG( BACK_META, DETAIL1,
-                       "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
-                       return_val, cm->cur_entries, 0 );
+                       "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
+                       cm->cur_entries, 0, 0 );
 #else /* !NEW_LOGGING */
                Debug( LDAP_DEBUG_ANY,
-                       "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
-                       return_val, cm->cur_entries, 0 );
+                       "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
+                       cm->cur_entries, 0, 0 );
 #endif /* !NEW_LOGGING */
+               return_val = 0;
                ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); 
        }
        ldap_pvt_thread_mutex_lock(&cm->cache_mutex); 
@@ -1248,8 +1228,20 @@ proxy_cache_response(
                        }
                }
        } else if ( rs->sr_type == REP_RESULT && !si->over ) {
-               if ( cache_entries( op, rs, &uuid ) == 0)
+               if ( cache_entries( op, rs, &uuid ) == 0) {
                        qm->addfunc(qm, &si->query, si->template_id, &uuid);
+                       /* If the consistency checker suspended itself,
+                        * wake it back up
+                        */
+                       if ( cm->cc_paused ) {
+                               ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+                               if ( cm->cc_paused ) {
+                                       cm->cc_paused = 0;
+                                       ldap_pvt_runqueue_resched( &syncrepl_rq, cm->cc_arg, 0 );
+                               }
+                               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+                       }
+               }
        }
        return SLAP_CB_CONTINUE;
 }
@@ -1467,10 +1459,10 @@ proxy_cache_search(
        } else {
 #ifdef NEW_LOGGING
                LDAP_LOG( BACK_META, DETAIL1,
-                                       "QUERY NOT CACHEABLE no\n",
+                                       "QUERY NOT CACHEABLE\n",
                                        0, 0, 0);
 #else /* !NEW_LOGGING */
-               Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE no\n",
+               Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE\n",
                                        0, 0, 0);
 #endif /* !NEW_LOGGING */
        }
@@ -1525,101 +1517,118 @@ get_attr_set(
 }
 
 static void* 
-consistency_check(void* operation)
+consistency_check(
+       void *ctx,
+       void *arg )
 {
-       Operation op = *(Operation*)operation; 
-       BackendDB be = *op.o_bd;
+       struct re_s *rtask = arg;
+       slap_overinst *on = rtask->arg;
+       cache_manager *cm = on->on_bi.bi_private;
+       query_manager *qm = cm->qm; 
+       Operation op = {0};
+       BackendDB be = on->on_info->oi_bd;
 
        SlapReply rs = {REP_RESULT}; 
-
-       slap_overinst *on = (slap_overinst *)op.o_bd->bd_info;
-       cache_manager*  cm = on->on_bi.bi_private;
-       query_manager* qm = cm->qm; 
        CachedQuery* query, *query_prev; 
-       time_t curr_time; 
-       struct berval uuid;  
-       int i, return_val; 
+       int i, return_val, pause = 1; 
        QueryTemplate* templ;
 
        op.o_bd = &be;
+       op.o_dn = be.be_rootdn;
+       op.o_ndn = be.be_rootndn;
+       op.o_threadctx = ctx;
+
+       op.o_tmpmemctx = sl_mem_create( SLMALLOC_SLAB_SIZE, ctx );
+       op.o_tmpmfuncs = &sl_mfuncs;
+
        be.bd_info = cm->bi;
        be.be_private = cm->be_private;
       
-        for(;;) {
-               ldap_pvt_thread_sleep(cm->cc_period);     
-               for (i=0; qm->templates[i].querystr.bv_val; i++) {
-                       templ = qm->templates + i; 
-                       query = templ->query_last; 
-                       curr_time = slap_get_time(); 
-                       ldap_pvt_thread_mutex_lock(&cm->remove_mutex); 
-                       while (query && (query->expiry_time < curr_time)) {
-                               ldap_pvt_thread_mutex_lock(&qm->lru_mutex); 
-                               remove_query(qm, query); 
-                               ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); 
+       cm->cc_arg = arg;
+
+       for (i=0; qm->templates[i].querystr.bv_val; i++) {
+               templ = qm->templates + i; 
+               query = templ->query_last; 
+               if ( query ) pause = 0;
+               op.o_time = slap_get_time(); 
+               ldap_pvt_thread_mutex_lock(&cm->remove_mutex); 
+               while (query && (query->expiry_time < op.o_time)) {
+                       ldap_pvt_thread_mutex_lock(&qm->lru_mutex); 
+                       remove_query(qm, query); 
+                       ldap_pvt_thread_mutex_unlock(&qm->lru_mutex); 
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
-                                               i, 0, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
+                                       i, 0, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
-                                               i, 0, 0 );
+                       Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
+                                       i, 0, 0 );
 #endif /* !NEW_LOGGING */
-                               ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);  
-                               remove_from_template(query, templ); 
+                       ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);  
+                       remove_from_template(query, templ); 
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1,
-                                               "TEMPLATE %d QUERIES-- %d\n",
-                                               i, templ->no_of_queries, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1,
+                                       "TEMPLATE %d QUERIES-- %d\n",
+                                       i, templ->no_of_queries, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
-                                               i, templ->no_of_queries, 0 );
+                       Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
+                                       i, templ->no_of_queries, 0 );
 #endif /* !NEW_LOGGING */
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
-                                               i, 0, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
+                                       i, 0, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
-                                               i, 0, 0 );
+                       Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
+                                       i, 0, 0 );
 #endif /* !NEW_LOGGING */
-                               ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);  
-                               uuid = query->q_uuid; 
-                               return_val = remove_query_data(&op, &rs, &uuid);
+                       ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);  
+                       return_val = remove_query_data(&op, &rs, &query->q_uuid);
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1,
-                                               "STALE QUERY REMOVED, SIZE=%d\n",
-                                               return_val, 0, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1,
+                                       "STALE QUERY REMOVED, SIZE=%d\n",
+                                       return_val, 0, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
-                                                       return_val, 0, 0 );
+                       Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
+                                               return_val, 0, 0 );
 #endif /* !NEW_LOGGING */
-                               ldap_pvt_thread_mutex_lock(&cm->cache_mutex); 
-                               cm->cur_entries -= return_val;
-                               cm->num_cached_queries--; 
+                       ldap_pvt_thread_mutex_lock(&cm->cache_mutex); 
+                       cm->cur_entries -= return_val;
+                       cm->num_cached_queries--; 
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
-                                               cm->num_cached_queries, 0, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
+                                       cm->num_cached_queries, 0, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
-                                               cm->num_cached_queries, 0, 0 );
+                       Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
+                                       cm->num_cached_queries, 0, 0 );
 #endif /* !NEW_LOGGING */
-                               ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); 
+                       ldap_pvt_thread_mutex_unlock(&cm->cache_mutex); 
 #ifdef NEW_LOGGING
-                               LDAP_LOG( BACK_META, DETAIL1,
-                                       "STALE QUERY REMOVED, CACHE ="
-                                       "%d entries\n", 
-                                       cm->cur_entries, 0, 0 );
+                       LDAP_LOG( BACK_META, DETAIL1,
+                               "STALE QUERY REMOVED, CACHE ="
+                               "%d entries\n", 
+                               cm->cur_entries, 0, 0 );
 #else /* !NEW_LOGGING */
-                               Debug( LDAP_DEBUG_ANY,
-                                       "STALE QUERY REMOVED, CACHE ="
-                                       "%d entries\n", 
-                                       cm->cur_entries, 0, 0 );
+                       Debug( LDAP_DEBUG_ANY,
+                               "STALE QUERY REMOVED, CACHE ="
+                               "%d entries\n", 
+                               cm->cur_entries, 0, 0 );
 #endif /* !NEW_LOGGING */
-                               query_prev = query; 
-                               query = query->prev; 
-                               free_query(query_prev); 
-                       }
-                       ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); 
+                       query_prev = query; 
+                       query = query->prev; 
+                       free_query(query_prev); 
+               }
+               ldap_pvt_thread_mutex_unlock(&cm->remove_mutex); 
+       }
+       /* If there were no queries, defer processing for a while */
+       if ( pause ) {
+               ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+               cm->cc_paused = 1;
+               if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
+                       ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
                }
+               ldap_pvt_runqueue_resched( &syncrepl_rq, rtask, 1 );
+               ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
        }
+       return NULL;
 }
 
 
@@ -1842,8 +1851,8 @@ proxy_cache_init(
        cm->max_entries = 0; 
        cm->cur_entries = 0; 
        cm->max_queries = 10000; 
-       cm->cc_thread_started = 0; 
        cm->cc_period = 1000; 
+       cm->cc_paused = 0; 
        
        qm->attr_sets = NULL; 
        qm->templates = NULL; 
@@ -1875,6 +1884,12 @@ proxy_cache_open(
                rc = cm->bi->bi_db_open( be );
                be->be_private = private;
        }
+
+       ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+       ldap_pvt_runqueue_insert( &syncrepl_rq, cm->cc_period,
+               consistency_check, on );
+       ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+
        return rc;
 }