#include "slap.h"
#include "ldap_pvt.h"
#include "lutil.h"
+#include "ldap_rq.h"
/* query cache structs */
/* query */
int num_entries_limit; /* max # of entries in a cacheable query */
int cc_period; /* interval between successive consistency checks (sec) */
- int cc_thread_started;
- ldap_pvt_thread_t cc_thread;
+ int cc_paused;
+ void *cc_arg;
ldap_pvt_thread_mutex_t cache_mutex;
ldap_pvt_thread_mutex_t remove_mutex;
"SYNTAX 1.3.6.1.4.1.1466.115.121.1.40{64} "
"NO-USER-MODIFICATION USAGE directoryOperation )";
+/* Return 1 for an added entry, else 0 */
static int
merge_entry(
Operation *op,
modlist->sml_op = LDAP_MOD_ADD;
op->o_tag = LDAP_REQ_MODIFY;
op->orm_modlist = modlist;
- rc = op->o_bd->be_modify( op, &sreply );
+ op->o_bd->be_modify( op, &sreply );
slap_mods_free( modlist );
} else if ( rc == LDAP_REFERRAL ||
rc == LDAP_NO_SUCH_OBJECT ) {
syncrepl_add_glue( op, e );
e = NULL;
+ rc = 1;
+ }
+ if ( e ) {
+ entry_free( e );
+ rc = 0;
}
- if ( e ) entry_free( e );
} else {
be_entry_release_w( op, e );
+ rc = 1;
}
- return 0;
+ return rc;
}
/* compare base and scope of incoming and cached queries */
strcasecmp(qt->querystr.bv_val, tempstr->bv_val) == 0);
}
-static void*
-consistency_check(
- void *op
-);
-
static int
filter2template(
Filter *f,
}
return_val = merge_entry(&op_tmp, e, query_uuid);
-#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1,
- "ENTRY ADDED/MERGED, CACHE =%d entries\n",
- cm->cur_entries, 0, 0 );
-#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY,
- "ENTRY ADDED/MERGED, CACHE =%d entries\n",
- cm->cur_entries, 0, 0 );
-#endif /* !NEW_LOGGING */
-#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL2, "UNLOCKING REMOVE MUTEX\n",
- 0, 0, 0 );
-#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_NONE, "UNLOCKING REMOVE MUTEX\n", 0, 0, 0 );
-#endif /* !NEW_LOGGING */
ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
-#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL2, "UNLOCKED REMOVE MUTEX\n",
- 0, 0, 0 );
-#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_NONE, "UNLOCKED REMOVE MUTEX\n", 0, 0, 0 );
-#endif /* !NEW_LOGGING */
- if (return_val)
- return 0;
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
- cm->cur_entries++;
+ cm->cur_entries += return_val;
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
- "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
- return_val, cm->cur_entries, 0 );
+ "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
+ cm->cur_entries, 0, 0 );
#else /* !NEW_LOGGING */
Debug( LDAP_DEBUG_ANY,
- "ENTRY ADDED/MERGED, SIZE=%d, CACHED ENTRIES=%d\n",
- return_val, cm->cur_entries, 0 );
+ "ENTRY ADDED/MERGED, CACHED ENTRIES=%d\n",
+ cm->cur_entries, 0, 0 );
#endif /* !NEW_LOGGING */
+ return_val = 0;
ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
}
ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
}
}
} else if ( rs->sr_type == REP_RESULT && !si->over ) {
- if ( cache_entries( op, rs, &uuid ) == 0)
+ if ( cache_entries( op, rs, &uuid ) == 0) {
qm->addfunc(qm, &si->query, si->template_id, &uuid);
+ /* If the consistency checker suspended itself,
+ * wake it back up
+ */
+ if ( cm->cc_paused ) {
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ if ( cm->cc_paused ) {
+ cm->cc_paused = 0;
+ ldap_pvt_runqueue_resched( &syncrepl_rq, cm->cc_arg, 0 );
+ }
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+ }
+ }
}
return SLAP_CB_CONTINUE;
}
} else {
#ifdef NEW_LOGGING
LDAP_LOG( BACK_META, DETAIL1,
- "QUERY NOT CACHEABLE no\n",
+ "QUERY NOT CACHEABLE\n",
0, 0, 0);
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE no\n",
+ Debug( LDAP_DEBUG_ANY, "QUERY NOT CACHEABLE\n",
0, 0, 0);
#endif /* !NEW_LOGGING */
}
}
static void*
-consistency_check(void* operation)
+consistency_check(
+ void *ctx,
+ void *arg )
{
- Operation op = *(Operation*)operation;
- BackendDB be = *op.o_bd;
+ struct re_s *rtask = arg;
+ slap_overinst *on = rtask->arg;
+ cache_manager *cm = on->on_bi.bi_private;
+ query_manager *qm = cm->qm;
+ Operation op = {0};
+ BackendDB be = on->on_info->oi_bd;
SlapReply rs = {REP_RESULT};
-
- slap_overinst *on = (slap_overinst *)op.o_bd->bd_info;
- cache_manager* cm = on->on_bi.bi_private;
- query_manager* qm = cm->qm;
CachedQuery* query, *query_prev;
- time_t curr_time;
- struct berval uuid;
- int i, return_val;
+ int i, return_val, pause = 1;
QueryTemplate* templ;
op.o_bd = &be;
+ op.o_dn = be.be_rootdn;
+ op.o_ndn = be.be_rootndn;
+ op.o_threadctx = ctx;
+
+ op.o_tmpmemctx = sl_mem_create( SLMALLOC_SLAB_SIZE, ctx );
+ op.o_tmpmfuncs = &sl_mfuncs;
+
be.bd_info = cm->bi;
be.be_private = cm->be_private;
- for(;;) {
- ldap_pvt_thread_sleep(cm->cc_period);
- for (i=0; qm->templates[i].querystr.bv_val; i++) {
- templ = qm->templates + i;
- query = templ->query_last;
- curr_time = slap_get_time();
- ldap_pvt_thread_mutex_lock(&cm->remove_mutex);
- while (query && (query->expiry_time < curr_time)) {
- ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
- remove_query(qm, query);
- ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
+ cm->cc_arg = arg;
+
+ for (i=0; qm->templates[i].querystr.bv_val; i++) {
+ templ = qm->templates + i;
+ query = templ->query_last;
+ if ( query ) pause = 0;
+ op.o_time = slap_get_time();
+ ldap_pvt_thread_mutex_lock(&cm->remove_mutex);
+ while (query && (query->expiry_time < op.o_time)) {
+ ldap_pvt_thread_mutex_lock(&qm->lru_mutex);
+ remove_query(qm, query);
+ ldap_pvt_thread_mutex_unlock(&qm->lru_mutex);
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
- i, 0, 0 );
+ LDAP_LOG( BACK_META, DETAIL1, "Lock CR index = %d\n",
+ i, 0, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
- i, 0, 0 );
+ Debug( LDAP_DEBUG_ANY, "Lock CR index = %d\n",
+ i, 0, 0 );
#endif /* !NEW_LOGGING */
- ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
- remove_from_template(query, templ);
+ ldap_pvt_thread_rdwr_wlock(&templ->t_rwlock);
+ remove_from_template(query, templ);
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1,
- "TEMPLATE %d QUERIES-- %d\n",
- i, templ->no_of_queries, 0 );
+ LDAP_LOG( BACK_META, DETAIL1,
+ "TEMPLATE %d QUERIES-- %d\n",
+ i, templ->no_of_queries, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
- i, templ->no_of_queries, 0 );
+ Debug( LDAP_DEBUG_ANY, "TEMPLATE %d QUERIES-- %d\n",
+ i, templ->no_of_queries, 0 );
#endif /* !NEW_LOGGING */
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
- i, 0, 0 );
+ LDAP_LOG( BACK_META, DETAIL1, "Unlock CR index = %d\n",
+ i, 0, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
- i, 0, 0 );
+ Debug( LDAP_DEBUG_ANY, "Unlock CR index = %d\n",
+ i, 0, 0 );
#endif /* !NEW_LOGGING */
- ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
- uuid = query->q_uuid;
- return_val = remove_query_data(&op, &rs, &uuid);
+ ldap_pvt_thread_rdwr_wunlock(&templ->t_rwlock);
+ return_val = remove_query_data(&op, &rs, &query->q_uuid);
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1,
- "STALE QUERY REMOVED, SIZE=%d\n",
- return_val, 0, 0 );
+ LDAP_LOG( BACK_META, DETAIL1,
+ "STALE QUERY REMOVED, SIZE=%d\n",
+ return_val, 0, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
- return_val, 0, 0 );
+ Debug( LDAP_DEBUG_ANY, "STALE QUERY REMOVED, SIZE=%d\n",
+ return_val, 0, 0 );
#endif /* !NEW_LOGGING */
- ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
- cm->cur_entries -= return_val;
- cm->num_cached_queries--;
+ ldap_pvt_thread_mutex_lock(&cm->cache_mutex);
+ cm->cur_entries -= return_val;
+ cm->num_cached_queries--;
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
- cm->num_cached_queries, 0, 0 );
+ LDAP_LOG( BACK_META, DETAIL1, "STORED QUERIES = %lu\n",
+ cm->num_cached_queries, 0, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
- cm->num_cached_queries, 0, 0 );
+ Debug( LDAP_DEBUG_ANY, "STORED QUERIES = %lu\n",
+ cm->num_cached_queries, 0, 0 );
#endif /* !NEW_LOGGING */
- ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
+ ldap_pvt_thread_mutex_unlock(&cm->cache_mutex);
#ifdef NEW_LOGGING
- LDAP_LOG( BACK_META, DETAIL1,
- "STALE QUERY REMOVED, CACHE ="
- "%d entries\n",
- cm->cur_entries, 0, 0 );
+ LDAP_LOG( BACK_META, DETAIL1,
+ "STALE QUERY REMOVED, CACHE ="
+ "%d entries\n",
+ cm->cur_entries, 0, 0 );
#else /* !NEW_LOGGING */
- Debug( LDAP_DEBUG_ANY,
- "STALE QUERY REMOVED, CACHE ="
- "%d entries\n",
- cm->cur_entries, 0, 0 );
+ Debug( LDAP_DEBUG_ANY,
+ "STALE QUERY REMOVED, CACHE ="
+ "%d entries\n",
+ cm->cur_entries, 0, 0 );
#endif /* !NEW_LOGGING */
- query_prev = query;
- query = query->prev;
- free_query(query_prev);
- }
- ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
+ query_prev = query;
+ query = query->prev;
+ free_query(query_prev);
+ }
+ ldap_pvt_thread_mutex_unlock(&cm->remove_mutex);
+ }
+ /* If there were no queries, defer processing for a while */
+ if ( pause ) {
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ cm->cc_paused = 1;
+ if ( ldap_pvt_runqueue_isrunning( &syncrepl_rq, rtask )) {
+ ldap_pvt_runqueue_stoptask( &syncrepl_rq, rtask );
}
+ ldap_pvt_runqueue_resched( &syncrepl_rq, rtask, 1 );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
}
+ return NULL;
}
cm->max_entries = 0;
cm->cur_entries = 0;
cm->max_queries = 10000;
- cm->cc_thread_started = 0;
cm->cc_period = 1000;
+ cm->cc_paused = 0;
qm->attr_sets = NULL;
qm->templates = NULL;
rc = cm->bi->bi_db_open( be );
be->be_private = private;
}
+
+ ldap_pvt_thread_mutex_lock( &syncrepl_rq.rq_mutex );
+ ldap_pvt_runqueue_insert( &syncrepl_rq, cm->cc_period,
+ consistency_check, on );
+ ldap_pvt_thread_mutex_unlock( &syncrepl_rq.rq_mutex );
+
return rc;
}