#include "portable.h"
-#define SLAPD_OVER_SSSVLV SLAPD_MOD_DYNAMIC
-
#ifdef SLAPD_OVER_SSSVLV
#include <stdio.h>
#include "slap.h"
#include "lutil.h"
+#include "config.h"
+
+#include "../../../libraries/liblber/lber-int.h" /* ber_rewind */
/* RFC2891: Server Side Sorting
* RFC2696: Paged Results
#define SAFESTR(macro_str, macro_def) ((macro_str) ? (macro_str) : (macro_def))
+#define SSSVLV_DEFAULT_MAX_KEYS 5
+
typedef struct vlv_ctrl {
int vc_before;
int vc_after;
typedef struct sort_ctrl {
int sc_nkeys;
- sort_key sc_keys[0];
+ sort_key sc_keys[1];
} sort_ctrl;
/* There is only one conn table for all overlay instances */
static sort_op **sort_conns;
static ldap_pvt_thread_mutex_t sort_conns_mutex;
+static int ov_count;
static const char *debug_header = "sssvlv";
static int sss_cid;
mr = sc->sc_keys[i].sk_ordering;
mr->smr_match( &cmp, 0, mr->smr_syntax, mr,
&sn1->sn_vals[i], &sn2->sn_vals[i] );
+ if ( cmp )
+ cmp *= sc->sc_keys[i].sk_direction;
}
}
+ return cmp;
+}
+
+static int node_insert( const void *val1, const void *val2 )
+{
/* Never return equal so that new entries are always inserted */
- return cmp < 0 ? -1 : 1;
+ return node_cmp( val1, val2 ) < 0 ? -1 : 1;
}
static int pack_vlv_response_control(
LDAPControl *ctrl;
BerElementBuffer berbuf;
BerElement *ber = (BerElement *)&berbuf;
- PagedResultsCookie resp_cookie;
struct berval cookie, bv;
int rc;
ch_free( so );
}
+static void send_list(
+ Operation *op,
+ SlapReply *rs,
+ sort_op *so)
+{
+ Avlnode *cur_node, *tmp_node;
+ vlv_ctrl *vc = op->o_controls[vlv_cid];
+ int i, j, dir, rc;
+ BackendDB *be;
+ Entry *e;
+ LDAPControl *ctrls[2];
+
+ /* FIXME: it may be better to just flatten the tree into
+ * an array before doing all of this...
+ */
+
+ /* Are we just counting an offset? */
+ if ( BER_BVISNULL( &vc->vc_value )) {
+ if ( vc->vc_offset == vc->vc_count ) {
+ /* wants the last entry in the list */
+ cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+ so->so_vlv_target = so->so_nentries;
+ } else if ( vc->vc_offset == 1 ) {
+ /* wants the first entry in the list */
+ cur_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+ so->so_vlv_target = 1;
+ } else {
+ int target;
+ /* Just iterate to the right spot */
+ if ( vc->vc_count && vc->vc_count != so->so_nentries ) {
+ if ( vc->vc_offset > vc->vc_count )
+ goto range_err;
+ target = so->so_nentries * vc->vc_offset / vc->vc_count;
+ } else {
+ if ( vc->vc_offset > so->so_nentries ) {
+range_err:
+ so->so_vlv_rc = LDAP_VLV_RANGE_ERROR;
+ pack_vlv_response_control( op, rs, so, ctrls );
+ ctrls[1] = NULL;
+ slap_add_ctrls( op, rs, ctrls );
+ rs->sr_err = LDAP_VLV_ERROR;
+ return;
+ }
+ target = vc->vc_offset;
+ }
+ so->so_vlv_target = target;
+ /* Start at left and go right, or start at right and go left? */
+ if ( target < so->so_nentries / 2 ) {
+ cur_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+ dir = TAVL_DIR_RIGHT;
+ } else {
+ cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+ dir = TAVL_DIR_LEFT;
+ target = so->so_nentries - target + 1;
+ }
+ for ( i=1; i<target; i++ )
+ cur_node = tavl_next( cur_node, dir );
+ }
+ } else {
+ /* we're looking for a specific value */
+ sort_ctrl *sc = so->so_ctrl;
+ MatchingRule *mr = sc->sc_keys[0].sk_ordering;
+ sort_node *sn;
+ struct berval bv;
+
+ if ( mr->smr_normalize ) {
+ rc = mr->smr_normalize( SLAP_MR_VALUE_OF_SYNTAX,
+ mr->smr_syntax, mr, &vc->vc_value, &bv, op->o_tmpmemctx );
+ if ( rc ) {
+ so->so_vlv_rc = LDAP_INAPPROPRIATE_MATCHING;
+ pack_vlv_response_control( op, rs, so, ctrls );
+ ctrls[1] = NULL;
+ slap_add_ctrls( op, rs, ctrls );
+ rs->sr_err = LDAP_VLV_ERROR;
+ return;
+ }
+ } else {
+ bv = vc->vc_value;
+ }
+
+ sn = op->o_tmpalloc( sizeof(sort_node) +
+ sc->sc_nkeys * sizeof(struct berval), op->o_tmpmemctx );
+ sn->sn_vals = (struct berval *)(sn+1);
+ sn->sn_conn = op->o_conn->c_conn_idx;
+ sn->sn_vals[0] = bv;
+ for (i=1; i<sc->sc_nkeys; i++) {
+ BER_BVZERO( &sn->sn_vals[i] );
+ }
+ cur_node = tavl_find3( so->so_tree, sn, node_cmp, &j );
+ /* didn't find >= match */
+ if ( j > 0 )
+ cur_node = NULL;
+ op->o_tmpfree( sn, op->o_tmpmemctx );
+
+ if ( !cur_node ) {
+ so->so_vlv_target = so->so_nentries + 1;
+ } else {
+ sort_node *sn = so->so_tree->avl_data;
+ /* start from the left or the right side? */
+ mr->smr_match( &i, 0, mr->smr_syntax, mr, &bv, &sn->sn_vals[0] );
+ if ( i > 0 ) {
+ tmp_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+ dir = TAVL_DIR_LEFT;
+ } else {
+ tmp_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+ dir = TAVL_DIR_RIGHT;
+ }
+ for (i=0; tmp_node != cur_node;
+ tmp_node = tavl_next( tmp_node, dir ), i++);
+ so->so_vlv_target = i;
+ }
+ if ( bv.bv_val != vc->vc_value.bv_val )
+ op->o_tmpfree( bv.bv_val, op->o_tmpmemctx );
+ }
+ if ( !cur_node ) {
+ i = 1;
+ cur_node = tavl_end(so->so_tree, TAVL_DIR_RIGHT);
+ } else {
+ i = 0;
+ }
+ for ( ; i<vc->vc_before; i++ ) {
+ tmp_node = tavl_next( cur_node, TAVL_DIR_LEFT );
+ if ( !tmp_node ) break;
+ cur_node = tmp_node;
+ }
+ j = i + vc->vc_after + 1;
+ be = op->o_bd;
+ for ( i=0; i<j; i++ ) {
+ sort_node *sn = cur_node->avl_data;
+
+ op->o_bd = select_backend( &sn->sn_dn, 0 );
+ e = NULL;
+ rc = be_entry_get_rw( op, &sn->sn_dn, NULL, NULL, 0, &e );
+
+ if ( e && rc == LDAP_SUCCESS ) {
+ rs->sr_entry = e;
+ rs->sr_flags = REP_ENTRY_MUSTRELEASE;
+ rs->sr_err = send_search_entry( op, rs );
+ if ( rs->sr_err == LDAP_UNAVAILABLE )
+ break;
+ }
+ cur_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
+ if ( !cur_node ) break;
+ }
+ so->so_vlv_rc = LDAP_SUCCESS;
+
+ op->o_bd = be;
+}
+
static void send_page( Operation *op, SlapReply *rs, sort_op *so )
{
Avlnode *cur_node = so->so_tree;
next_node = tavl_next( cur_node, TAVL_DIR_RIGHT );
-
op->o_bd = select_backend( &sn->sn_dn, 0 );
e = NULL;
rc = be_entry_get_rw( op, &sn->sn_dn, NULL, NULL, 0, &e );
op->o_bd = be;
}
-static int send_entry(
+static void send_entry(
Operation *op,
SlapReply *rs,
sort_op *so)
"%s: response control: status=%d, text=%s\n",
debug_header, rs->sr_err, SAFESTR(rs->sr_text, "<None>"));
+ if ( !so->so_tree )
+ return;
+
/* RFC 2891: If critical then send the entries iff they were
* succesfully sorted. If non-critical send all entries
* whether they were sorted or not.
if ( (op->o_ctrlflag[sss_cid] != SLAP_CONTROL_CRITICAL) ||
(rs->sr_err == LDAP_SUCCESS) )
{
- /* Get the first node to send */
- Avlnode *start_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
- so->so_tree = start_node;
-
- if ( so->so_paged <= SLAP_CONTROL_IGNORED ) {
- /* Not paged result search. Send all entries.
- * Set the page size to the number of entries
- * so that send_page() will send all entries.
- */
- so->so_page_size = so->so_nentries;
- }
+ if ( so->so_vlv > SLAP_CONTROL_IGNORED ) {
+ send_list( op, rs, so );
+ } else {
+ /* Get the first node to send */
+ Avlnode *start_node = tavl_end(so->so_tree, TAVL_DIR_LEFT);
+ so->so_tree = start_node;
+
+ if ( so->so_paged <= SLAP_CONTROL_IGNORED ) {
+ /* Not paged result search. Send all entries.
+ * Set the page size to the number of entries
+ * so that send_page() will send all entries.
+ */
+ so->so_page_size = so->so_nentries;
+ }
- send_page( op, rs, so );
+ send_page( op, rs, so );
+ }
}
-
- return SLAP_CB_CONTINUE;
}
static void send_result(
sort_op *so)
{
LDAPControl *ctrls[3];
- int rc;
+ int rc, i = 0;
rc = pack_sss_response_control( op, rs, ctrls );
- if ( rc == LDAP_SUCCESS && so->so_paged > SLAP_CONTROL_IGNORED ) {
- rc = pack_pagedresult_response_control( op, rs, so, ctrls+1 );
- ctrls[2] = NULL;
- } else {
- ctrls[1] = NULL;
+ if ( rc == LDAP_SUCCESS ) {
+ i++;
+ rc = -1;
+ if ( so->so_paged > SLAP_CONTROL_IGNORED ) {
+ rc = pack_pagedresult_response_control( op, rs, so, ctrls+1 );
+ } else if ( so->so_vlv > SLAP_CONTROL_IGNORED ) {
+ rc = pack_vlv_response_control( op, rs, so, ctrls+1 );
+ }
+ if ( rc == LDAP_SUCCESS )
+ i++;
}
+ ctrls[i] = NULL;
if ( ctrls[0] != NULL )
slap_add_ctrls( op, rs, ctrls );
{
sort_ctrl *sc = op->o_controls[sss_cid];
sort_op *so = op->o_callback->sc_private;
- int rc = SLAP_CB_CONTINUE;
if ( rs->sr_type == REP_SEARCH ) {
int i;
sn2->sn_vals = (struct berval *)(sn2+1);
AC_MEMCPY( sn2->sn_vals, sn->sn_vals,
sc->sc_nkeys * sizeof(struct berval));
- sn = sn2;
- ptr = (char *)(sn->sn_vals + sc->sc_nkeys);
- sn->sn_dn.bv_val = ptr;
- sn->sn_dn.bv_len = rs->sr_entry->e_nname.bv_len;
+ ptr = (char *)(sn2->sn_vals + sc->sc_nkeys);
+ sn2->sn_dn.bv_val = ptr;
+ sn2->sn_dn.bv_len = rs->sr_entry->e_nname.bv_len;
AC_MEMCPY( ptr, rs->sr_entry->e_nname.bv_val,
rs->sr_entry->e_nname.bv_len );
ptr += rs->sr_entry->e_nname.bv_len;
*ptr++ = '\0';
for ( i=0; i<sc->sc_nkeys; i++ ) {
- if ( !BER_BVISNULL( &sn->sn_vals[i] )) {
- AC_MEMCPY( ptr, sn->sn_vals[i].bv_val, sn->sn_vals[i].bv_len );
- sn->sn_vals[i].bv_val = ptr;
- ptr += sn->sn_vals[i].bv_len;
+ if ( !BER_BVISNULL( &sn2->sn_vals[i] )) {
+ AC_MEMCPY(ptr, sn2->sn_vals[i].bv_val, sn2->sn_vals[i].bv_len);
+ sn2->sn_vals[i].bv_val = ptr;
+ ptr += sn2->sn_vals[i].bv_len;
*ptr++ = '\0';
}
}
+ op->o_tmpfree( sn, op->o_tmpmemctx );
+ sn = sn2;
sn->sn_conn = op->o_conn->c_conn_idx;
/* Insert into the AVL tree */
- tavl_insert(&(so->so_tree), sn, node_cmp, avl_dup_ok);
+ tavl_insert(&(so->so_tree), sn, node_insert, avl_dup_error);
so->so_nentries++;
/* Collected the keys so that they can be sorted. Thus, stop
* the entry from propagating.
*/
- rc = LDAP_SUCCESS;
+ rs->sr_err = LDAP_SUCCESS;
}
else if ( rs->sr_type == REP_RESULT ) {
/* Remove serversort response callback.
op->o_callback = op->o_callback->sc_next;
}
- rc = send_entry( op, rs, so );
+ send_entry( op, rs, so );
send_result( op, rs, so );
}
- return rc;
+ return rs->sr_err;
}
static int sssvlv_op_search(
int rc = SLAP_CB_CONTINUE;
int ok;
sort_op *so, so2;
- sort_ctrl *sc = op->o_controls[sss_cid];
+ sort_ctrl *sc;
PagedResultsState *ps;
vlv_ctrl *vc;
op->o_req_dn.bv_val, op->ors_filterstr.bv_val,
op->o_ctrlflag[sss_cid]);
+ sc = op->o_controls[sss_cid];
if ( sc->sc_nkeys > si->svi_max_keys ) {
rs->sr_text = "Too many sort keys";
rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
} else if ( !ps->ps_size ) {
/* Abandoning current request */
ok = 0;
+ so->so_nentries = 0;
+ rs->sr_err = LDAP_SUCCESS;
}
}
+ if (( vc && so->so_paged > SLAP_CONTROL_IGNORED ) ||
+ ( ps && so->so_vlv > SLAP_CONTROL_IGNORED )) {
+ /* changed from paged to vlv or vice versa, abandon */
+ ok = 0;
+ so->so_nentries = 0;
+ rs->sr_err = LDAP_UNWILLING_TO_PERFORM;
+ }
/* Are there too many running overall? */
} else if ( si->svi_num >= si->svi_max ) {
ok = 0;
}
ldap_pvt_thread_mutex_unlock( &sort_conns_mutex );
if ( ok ) {
+ /* are we continuing a VLV search? */
+ if ( vc && vc->vc_context ) {
+ so->so_ctrl = sc;
+ send_list( op, rs, so );
+ send_result( op, rs, so );
+ rc = LDAP_SUCCESS;
/* are we continuing a paged search? */
- if ( ps && ps->ps_cookie ) {
+ } else if ( ps && ps->ps_cookie ) {
so->so_ctrl = sc;
send_page( op, rs, so );
send_result( op, rs, so );
rc = LDAP_SUCCESS;
- }
- else {
+ } else {
slap_callback *cb = op->o_tmpalloc( sizeof(slap_callback),
op->o_tmpmemctx );
/* Install serversort response callback to handle a new search */
- if ( ps ) {
+ if ( ps || vc ) {
so = ch_malloc( sizeof(sort_op));
} else {
so = op->o_tmpalloc( sizeof(sort_op), op->o_tmpmemctx );
} else {
so->so_paged = 0;
so->so_page_size = 0;
+ if ( vc ) {
+ so->so_vlv = op->o_ctrlflag[vlv_cid];
+ so->so_vlv_target = 0;
+ so->so_vlv_rc = 0;
+ }
}
+ so->so_vcontext = (unsigned long)so;
so->so_nentries = 0;
op->o_callback = cb;
}
} else {
- if ( ps && !ps->ps_size ) {
+ if ( so && !so->so_nentries ) {
free_sort_op( op->o_conn, so );
- rs->sr_err = LDAP_SUCCESS;
} else {
rs->sr_text = "Other sort requests already in progress";
rs->sr_err = LDAP_BUSY;
i = count_key( ber );
sc = op->o_tmpalloc( sizeof(sort_ctrl) +
- i * sizeof(sort_key), op->o_tmpmemctx );
+ (i-1) * sizeof(sort_key), op->o_tmpmemctx );
sc->sc_nkeys = i;
op->o_controls[sss_cid] = sc;
tag = ber_peek_tag( ber, &len );
if ( tag == LDAP_VLVBYINDEX_IDENTIFIER ) {
- tag = ber_scanf( ber, "ii", &vc2.vc_offset, &vc2.vc_count );
+ tag = ber_scanf( ber, "{ii}", &vc2.vc_offset, &vc2.vc_count );
if ( tag == LBER_ERROR )
return rs->sr_err;
BER_BVZERO( &vc2.vc_value );
BackendDB *be,
ConfigReply *cr )
{
- int rc = overlay_register_control( be, LDAP_CONTROL_SORTREQUEST );
+ slap_overinst *on = (slap_overinst *)be->bd_info;
+ sssvlv_info *si = on->on_bi.bi_private;
+ int rc;
+
+ /* If not set, default to 1/2 of available threads */
+ if ( !si->svi_max )
+ si->svi_max = connection_pool_max / 2;
+
+ rc = overlay_register_control( be, LDAP_CONTROL_SORTREQUEST );
if ( rc == LDAP_SUCCESS )
rc = overlay_register_control( be, LDAP_CONTROL_VLVREQUEST );
return rc;
}
+static ConfigTable sssvlv_cfg[] = {
+ { "sssvlv-max", "num",
+ 2, 2, 0, ARG_INT|ARG_OFFSET,
+ (void *)offsetof(sssvlv_info, svi_max),
+ "( OLcfgOvAt:21.1 NAME 'olcSssVlvMax' "
+ "DESC 'Maximum number of concurrent Sort requests' "
+ "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
+ { "sssvlv-maxkeys", "num",
+ 2, 2, 0, ARG_INT|ARG_OFFSET,
+ (void *)offsetof(sssvlv_info, svi_max_keys),
+ "( OLcfgOvAt:21.2 NAME 'olcSssVlvMaxKeys' "
+ "DESC 'Maximum number of Keys in a Sort request' "
+ "SYNTAX OMsInteger SINGLE-VALUE )", NULL, NULL },
+ { NULL, NULL, 0, 0, 0, ARG_IGNORED }
+};
+
+static ConfigOCs sssvlv_ocs[] = {
+ { "( OLcfgOvOc:21.1 "
+ "NAME 'olcSssVlvConfig' "
+ "DESC 'SSS VLV configuration' "
+ "SUP olcOverlayConfig "
+ "MAY ( olcSssVlvMax $ olcSssVlvMaxKeys ) )",
+ Cft_Overlay, sssvlv_cfg, NULL, NULL },
+ { NULL, 0, NULL }
+};
+
static int sssvlv_db_init(
BackendDB *be,
ConfigReply *cr)
si = (sssvlv_info *)ch_malloc(sizeof(sssvlv_info));
on->on_bi.bi_private = si;
- si->svi_max = 5;
+ si->svi_max = 0;
si->svi_num = 0;
- si->svi_max_keys = 5;
+ si->svi_max_keys = SSSVLV_DEFAULT_MAX_KEYS;
if ( dtblsize && !sort_conns ) {
ldap_pvt_thread_mutex_init( &sort_conns_mutex );
sort_conns = ch_calloc( sizeof(sort_op *), dtblsize + 1 );
sort_conns++;
}
+ ov_count++;
return LDAP_SUCCESS;
}
slap_overinst *on = (slap_overinst *)be->bd_info;
sssvlv_info *si = (sssvlv_info *)on->on_bi.bi_private;
+ ov_count--;
+ if ( !ov_count && sort_conns) {
+ sort_conns--;
+ ch_free(sort_conns);
+ ldap_pvt_thread_mutex_destroy( &sort_conns_mutex );
+ }
+
if ( si ) {
ch_free( si );
on->on_bi.bi_private = NULL;
sssvlv.on_bi.bi_connection_destroy = sssvlv_connection_destroy;
sssvlv.on_bi.bi_op_search = sssvlv_op_search;
+ sssvlv.on_bi.bi_cf_ocs = sssvlv_ocs;
+
+ rc = config_register_schema( sssvlv_cfg, sssvlv_ocs );
+ if ( rc )
+ return rc;
+
rc = register_supported_control2( LDAP_CONTROL_SORTREQUEST,
SLAP_CTRL_SEARCH,
NULL,