]> git.sur5r.net Git - openldap/blobdiff - servers/slapd/back-ndb/ndbio.cpp
Merge remote-tracking branch 'origin/mdb.master'
[openldap] / servers / slapd / back-ndb / ndbio.cpp
index bfca544ecd36308598acf8b04cb895b44e402abd..068df2e13a4a5d9a1ff76bbb691f9690c0050b48 100644 (file)
@@ -2,7 +2,7 @@
 /* $OpenLDAP$ */
 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
  *
- * Copyright 2008 The OpenLDAP Foundation.
+ * Copyright 2008-2012 The OpenLDAP Foundation.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -106,7 +106,7 @@ static int
 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
        int create )
 {
-       NdbAttrInfo *ai, atmp;
+       NdbAttrInfo *ai;
        int i;
 
        for ( i=0; attrs[i]; i++ ) {
@@ -149,6 +149,10 @@ ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char *
                        return LDAP_OTHER;
                }
 
+               /* An attrset may have already been connected */
+               if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
+                       continue;
+
                /* An indexed attr is defined before its OC is */
                if ( !ai->na_oi ) {
                        ai->na_oi = oci;
@@ -172,8 +176,12 @@ ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char *
                }
 
                if ( create ) {
-                       *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
-                               ai->na_len );
+                       if ( ai->na_flag & NDB_INFO_ATBLOB ) {
+                               *ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
+                       } else {
+                               *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
+                                       ai->na_len );
+                       }
                }
        }
        return 0;
@@ -187,7 +195,7 @@ ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
 
        if ( create ) {
                ptr = buf + sprintf( buf,
-                       "CREATE TABLE `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
+                       "CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
                        oci->no_table.bv_val );
        }
 
@@ -203,7 +211,7 @@ ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
        /* assume all are present */
        oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
 
-       col = 1;
+       col = 2;
        ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
        if ( oci->no_oc->soc_required ) {
                rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
@@ -218,7 +226,7 @@ ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
                oci->no_nattrs * sizeof(struct ndb_attrinfo *));
 
        if ( create ) {
-               ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
+               ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
                rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
                if ( rc ) {
                        Debug( LDAP_DEBUG_ANY,
@@ -285,8 +293,8 @@ ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
                avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
 
                col = myTable->getNoOfColumns();
-               /* Skip 0, eid */
-               for ( j = 1; j<col; j++ ) {
+               /* Skip 0 and 1, eid and vid */
+               for ( j = 2; j<col; j++ ) {
                        myCol = myTable->getColumn( j );
                        ber_str2bv( myCol->getName(), 0, 0, &bv );
                        ai = ndb_ai_get( ni, &bv );
@@ -296,6 +304,8 @@ ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
                        ai->na_oi = oci;
                        ai->na_column = j;
                        ai->na_len = myCol->getLength();
+                       if ( myCol->getType() == NdbDictionary::Column::Blob )
+                               ai->na_flag |= NDB_INFO_ATBLOB;
                }
        }
        /* Link to any attrsets */
@@ -311,7 +321,7 @@ ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
                /* shouldn't happen */
                if ( !oci )
                        continue;
-               col = 1;
+               col = 2;
                if ( oci->no_oc->soc_required ) {
                        rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
                }
@@ -326,8 +336,8 @@ ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
 }
 
 static int
-ndb_oc_get( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
-       struct berval *oname, NdbOcs *out )
+ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
+       struct berval *oname, int implied, NdbOcs *out )
 {
        const NdbDictionary::Table *myTable;
        NdbOcInfo *oci, octmp;
@@ -351,7 +361,7 @@ ndb_oc_get( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
                        int i;
 
                        for ( i=0; oc->soc_sups[i]; i++ ) {
-                               rc = ndb_oc_get( ni, myDict, &oc->soc_sups[i]->soc_cname, out );
+                               rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
                                if ( rc ) return rc;
                        }
                }
@@ -362,8 +372,17 @@ ndb_oc_get( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
        for ( i=0; i<out->no_ntext; i++ )
                if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
                        break;
-       if ( i == out->no_ntext )
-               out->no_text[out->no_ntext++] = oc->soc_cname;
+       if ( i == out->no_ntext ) {
+               for ( i=0; i<out->no_nitext; i++ )
+                       if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
+                               break;
+               if ( i == out->no_nitext ) {
+                       if ( implied )
+                               out->no_itext[out->no_nitext++] = oc->soc_cname;
+                       else
+                               out->no_text[out->no_ntext++] = oc->soc_cname;
+               }
+       }
 
        /* ignore top, etc... */
        if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
@@ -434,6 +453,9 @@ ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, N
        oci->no_oc->soc_cname = oci->no_name;
        oci->no_flag = NDB_INFO_ATSET;
 
+       if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
+               oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
+
        rc = ndb_oc_create( ni, oci, 0 );
        if ( !rc )
                rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
@@ -453,7 +475,7 @@ ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
        int i;
 
        ptr = buf + sprintf( buf,
-               "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
+               "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
                oci->no_table.bv_val );
 
        for ( i=0; i<oci->no_nattrs; i++ ) {
@@ -466,7 +488,7 @@ ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
                        ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
                }
        }
-       ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
+       ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
        i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
        if ( i ) {
                Debug( LDAP_DEBUG_ANY,
@@ -487,6 +509,7 @@ ndb_oc_check( BackendDB *be, Ndb *ndb,
 
        out->no_ninfo = 0;
        out->no_ntext = 0;
+       out->no_nitext = 0;
 
        /* Find all objectclasses and their superiors. List
         * the superiors first.
@@ -494,17 +517,20 @@ ndb_oc_check( BackendDB *be, Ndb *ndb,
 
        ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
        for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
-               rc = ndb_oc_get( ni, myDict, &ocsin[i], out );
+               rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
                if ( rc ) break;
        }
        ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
        return rc;
 }
 
+#define        V_INS   1
+#define        V_DEL   2
+#define        V_REP   3
+
+static int ndb_flush_blobs;
+
 /* set all the unique attrs of this objectclass into the table
- * max row size is 8192 bytes; how do we detect if the row is too large?
- *
- * FIXME: Currently only stores the first value of any multivalued attribute.
  */
 extern "C" int
 ndb_oc_attrs(
@@ -514,85 +540,166 @@ ndb_oc_attrs(
        NdbOcInfo *no,
        NdbAttrInfo **attrs,
        int nattrs,
-       int update,
-       int *num,
-       NdbOperation **retop
+       Attribute *old
 )
 {
        char buf[65538], *ptr;
-       Attribute *a;
-       NdbOperation *myop = retop ? *retop : NULL;
-       int i;
+       Attribute **an, **ao, *a;
+       NdbOperation *myop;
+       int i, j, max = 0;
+       int changed, rc;
+       Uint64 eid = e->e_id;
+
+       if ( !nattrs )
+               return 0;
 
+       an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
+       ao = an + nattrs;
+
+       /* Turn lists of attrs into arrays for easier access */
        for ( i=0; i<nattrs; i++ ) {
-               /* Skip if not in this table */
-               if ( attrs[i]->na_oi != no )
+               if ( attrs[i]->na_oi != no ) {
+                       an[i] = NULL;
+                       ao[i] = NULL;
                        continue;
+               }
                for ( a=e->e_attrs; a; a=a->a_next ) {
-                       if ( a->a_desc->ad_type == attrs[i]->na_attr )
+                       if ( a->a_desc == slap_schema.si_ad_objectClass )
+                               continue;
+                       if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
+                               /* Don't process same attr twice */
+                               if ( a->a_flags & SLAP_ATTR_IXADD )
+                                       a = NULL;
+                               else
+                                       a->a_flags |= SLAP_ATTR_IXADD;
                                break;
+                       }
                }
-               /* If we found a match, set its value. If we found no match
-                * and we're updating, delete its value.
-                */
-               if ( a || update ) {
-                       /* objectclass is in dn_idx_table */
-                       if ( a && a->a_desc == slap_schema.si_ad_objectClass )
+               an[i] = a;
+               if ( a && a->a_numvals > max )
+                       max = a->a_numvals;
+               for ( a=old; a; a=a->a_next ) {
+                       if ( a->a_desc == slap_schema.si_ad_objectClass )
                                continue;
+                       if ( a->a_desc->ad_type == attrs[i]->na_attr )
+                               break;
+               }
+               ao[i] = a;
+               if ( a && a->a_numvals > max )
+                       max = a->a_numvals;
+       }
 
-                       /* First attr, get the op, set the type and primary key */
-                       if ( !*num ) {
-                               Uint64 eid = e->e_id;
-                               myop = txn->getNdbOperation( myTable );
-                               if ( !myop )
-                                       return LDAP_OTHER;
-                               if ( update ) {
-                                       if ( myop->writeTuple())
-                                               return LDAP_OTHER;
+       for ( i=0; i<max; i++ ) {
+               myop = NULL;
+               for ( j=0; j<nattrs; j++ ) {
+                       if ( !an[j] && !ao[j] )
+                               continue;
+                       changed = 0;
+                       if ( an[j] && an[j]->a_numvals > i ) {
+                               /* both old and new are present, compare for changes */
+                               if ( ao[j] && ao[j]->a_numvals > i ) {
+                                       if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
+                                               changed = V_REP;
                                } else {
-                                       if ( myop->insertTuple())
-                                               return LDAP_OTHER;
+                                       changed = V_INS;
                                }
-                               if ( myop->equal( EID_COLUMN, eid ))
-                                       return LDAP_OTHER;
+                       } else {
+                               if ( ao[j] && ao[j]->a_numvals > i )
+                                       changed = V_DEL;
                        }
-                       ptr = buf;
-                       if ( a ) {
-                               if ( a->a_vals[0].bv_len > attrs[i]->na_len ) {
-                                       Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
-                                               attrs[i]->na_name.bv_val, 0, 0 );
-                                       return LDAP_CONSTRAINT_VIOLATION;
+                       if ( changed ) {
+                               if ( !myop ) {
+                                       rc = LDAP_OTHER;
+                                       myop = txn->getNdbOperation( myTable );
+                                       if ( !myop ) {
+                                               goto done;
+                                       }
+                                       if ( old ) {
+                                               if ( myop->writeTuple()) {
+                                                       goto done;
+                                               }
+                                       } else {
+                                               if ( myop->insertTuple()) {
+                                                       goto done;
+                                               }
+                                       }
+                                       if ( myop->equal( EID_COLUMN, eid )) {
+                                               goto done;
+                                       }
+                                       if ( myop->equal( VID_COLUMN, i )) {
+                                               goto done;
+                                       }
                                }
-                               *ptr++ = a->a_vals[0].bv_len & 0xff;
-                               if ( attrs[i]->na_len > 255 ) {
-                                       /* MedVar */
-                                       *ptr++ = a->a_vals[0].bv_len >> 8;
+                               if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
+                                       NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
+                                       rc = LDAP_OTHER;
+                                       if ( !myBlob ) {
+                                               Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
+                                                       myop->getNdbError().message, myop->getNdbError().code, 0 );
+                                               goto done;
+                                       }
+                                       if ( slapMode & SLAP_TOOL_MODE )
+                                               ndb_flush_blobs = 1;
+                                       if ( changed & V_INS ) {
+                                               if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
+                                                       Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
+                                                               myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
+                                                       goto done;
+                                               }
+                                       } else {
+                                               if ( myBlob->setValue( NULL, 0 )) {
+                                                       Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
+                                                               myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
+                                                       goto done;
+                                               }
+                                       }
+                               } else {
+                                       if ( changed & V_INS ) {
+                                               if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
+                                                       Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
+                                                               attrs[j]->na_name.bv_val, 0, 0 );
+                                                       rc = LDAP_CONSTRAINT_VIOLATION;
+                                                       goto done;
+                                               }
+                                               ptr = buf;
+                                               *ptr++ = an[j]->a_vals[i].bv_len & 0xff;
+                                               if ( attrs[j]->na_len > 255 ) {
+                                                       /* MedVar */
+                                                       *ptr++ = an[j]->a_vals[i].bv_len >> 8;
+                                               }
+                                               memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
+                                               ptr = buf;
+                                       } else {
+                                               ptr = NULL;
+                                       }
+                                       if ( myop->setValue( attrs[j]->na_column, ptr )) {
+                                               rc = LDAP_OTHER;
+                                               goto done;
+                                       }
                                }
-                               memcpy( ptr, a->a_vals[0].bv_val, a->a_vals[0].bv_len );
-                               ptr = buf;
-                       } else {
-                               ptr = NULL;
                        }
-                       if ( myop->setValue( attrs[i]->na_column, ptr ))
-                               return LDAP_OTHER;
-                       (*num)++;
                }
        }
-       if ( retop )
-               *retop = myop;
-       return LDAP_SUCCESS;
+       rc = LDAP_SUCCESS;
+done:
+       ch_free( an );
+       if ( rc ) {
+               Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
+                       myop->getNdbError().message, myop->getNdbError().code, 0 );
+       }
+       return rc;
 }
 
 static int
 ndb_oc_put(
        const NdbDictionary::Dictionary *myDict,
-       NdbTransaction *txn, NdbOcInfo *no, Entry *e, int update )
+       NdbTransaction *txn, NdbOcInfo *no, Entry *e )
 {
        const NdbDictionary::Table *myTable;
        int i, rc;
 
        for ( i=0; i<no->no_nsets; i++ ) {
-               rc = ndb_oc_put( myDict, txn, no->no_sets[i], e, update );
+               rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
                if ( rc )
                        return rc;
        }
@@ -601,23 +708,19 @@ ndb_oc_put(
        if ( !myTable )
                return LDAP_OTHER;
 
-       i = 0;
-       return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, update, &i, NULL );
+       return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
 }
 
+/* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
 extern "C" int
 ndb_entry_put_data(
        BackendDB *be,
-       NdbArgs *NA,
-       int update
+       NdbArgs *NA
 )
 {
        struct ndb_info *ni = (struct ndb_info *) be->be_private;
-       ObjectClass *oc;
-       Attribute *aoc, *a;
+       Attribute *aoc;
        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
-       const NdbDictionary::Table *myTable;
-       NdbOperation *myop;
        NdbOcs myOcs;
        int i, rc;
 
@@ -631,25 +734,39 @@ ndb_entry_put_data(
 
        /* Walk thru objectclasses, find all the attributes belonging to a class */
        for ( i=0; i<myOcs.no_ninfo; i++ ) {
-               rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e, update );
+               rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
                if ( rc ) return rc;
        }
 
+       /* slapadd tries to batch multiple entries per txn, but entry data is
+        * transient and blob data is required to remain valid for the whole txn.
+        * So we need to flush blobs before their source data disappears.
+        */
+       if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
+               NA->txn->execute( NdbTransaction::NoCommit );
+
        return 0;
 }
 
 static void
-ndb_oc_get( NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
+ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
 {
        int i;
        NdbOcInfo  **ol2;
 
        for ( i=0; i<no->no_nsets; i++ ) {
-               ndb_oc_get( no->no_sets[i], j, nocs, oclist );
+               ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
        }
+
+       /* Don't insert twice */
+       ol2 = *oclist;
+       for ( i=0; i<*j; i++ )
+               if ( ol2[i] == no )
+                       return;
+
        if ( *j >= *nocs ) {
                *nocs *= 2;
-               ol2 = (NdbOcInfo **)ch_realloc( *oclist, *nocs * sizeof(NdbOcInfo *));
+               ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
                *oclist = ol2;
        }
        ol2 = *oclist;
@@ -661,38 +778,38 @@ ndb_oc_get( NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
  */
 extern "C" int
 ndb_entry_get_data(
-       BackendDB *be,
+       Operation *op,
        NdbArgs *NA,
        int update
 )
 {
-       struct ndb_info *ni = (struct ndb_info *) be->be_private;
+       struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
        const NdbDictionary::Table *myTable;
-       NdbOperation *myop;
+       NdbIndexScanOperation **myop = NULL;
        Uint64 eid;
 
-       Attribute *aoc, *a;
+       Attribute *a;
        NdbOcs myOcs;
        NdbOcInfo *oci, **oclist = NULL;
        char abuf[65536], *ptr, **attrs = NULL;
+       struct berval bv[2];
+       int *ocx = NULL;
 
        /* FIXME: abuf should be dynamically allocated */
 
-       int i, j, k, nocs, nattrs, rc = LDAP_OTHER, alen;
-
-       attr_merge( NA->e, slap_schema.si_ad_objectClass, NA->ocs, NULL );
+       int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
 
        eid = NA->e->e_id;
 
-       ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
+       ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
        myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
        nocs = myOcs.no_ninfo;
 
-       oclist = (NdbOcInfo **)ch_calloc( 1, nocs * sizeof(NdbOcInfo *));
+       oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
 
        for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
-               ndb_oc_get( myOcs.no_info[i], &j, &nocs, &oclist );
+               ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
        }
 
        nocs = j;
@@ -700,103 +817,207 @@ ndb_entry_get_data(
        for ( i=0; i<nocs; i++ )
                nattrs += oclist[i]->no_nattrs;
 
-       attrs = (char **)ch_malloc( nattrs * sizeof(char *));
+       ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
+
+       attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
+
+       myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
 
        k = 0;
        ptr = abuf;
        for ( i=0; i<nocs; i++ ) {
                oci = oclist[i];
-               myTable = myDict->getTable( oci->no_table.bv_val );
 
-               myop = NA->txn->getNdbOperation( myTable );
-               if ( !myop )
+               myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
+               if ( !myop[i] )
                        goto leave;
-               if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
+               if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
                        goto leave;
-               if ( myop->equal( EID_COLUMN, eid ))
+               if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
                        goto leave;
 
                for ( j=0; j<oci->no_nattrs; j++ ) {
                        if ( oci->no_attrs[j]->na_oi != oci )
                                continue;
-                       attrs[k] = ptr;
-                       *ptr++ = 0;
-                       if ( oci->no_attrs[j]->na_len > 255 )
+                       if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
+                               NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
+                               attrs[k++] = (char *)bi;
+                       } else {
+                               attrs[k] = ptr;
                                *ptr++ = 0;
-                       ptr += oci->no_attrs[j]->na_len + 1;
-                       myop->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
+                               if ( oci->no_attrs[j]->na_len > 255 )
+                                       *ptr++ = 0;
+                               ptr += oci->no_attrs[j]->na_len + 1;
+                               myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
+                       }
                }
+               ocx[i] = k;
        }
-       if ( NA->txn->execute( update ? NdbTransaction::NoCommit : NdbTransaction::Commit,
-               update ? NdbOperation::AO_IgnoreError : NdbOperation::AbortOnError, 1) < 0 )
+       /* Must use IgnoreError, because an entry with multiple objectClasses may not
+        * actually have attributes defined in each class / table.
+        */
+       if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
                goto leave;
 
        /* count results */
-       nattrs = 0;
-       k = 0;
        for ( i=0; i<nocs; i++ ) {
-               oci = oclist[i];
-               for ( j=0; j<oci->no_nattrs; j++ ) {
-                       unsigned char *buf;
-                       int len;
-                       if ( oci->no_attrs[j]->na_oi != oci )
-                               continue;
-                       buf = (unsigned char *)attrs[k++];
-                       len = buf[0];
-                       if ( oci->no_attrs[j]->na_len > 255 ) {
-                               /* MedVar */
-                               len |= (buf[1] << 8);
+               if (( j = myop[i]->nextResult(true) )) {
+                       if ( j < 0 ) {
+                               Debug( LDAP_DEBUG_TRACE,
+                                       "ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
+                                       i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
                        }
-                       if ( !len )
-                               continue;
-                       nattrs++;
+                       myop[i] = NULL;
                }
        }
 
-       a = attrs_alloc( nattrs );
-       NA->e->e_attrs->a_next = a;
+       nattrs = 0;
        k = 0;
        for ( i=0; i<nocs; i++ ) {
                oci = oclist[i];
                for ( j=0; j<oci->no_nattrs; j++ ) {
                        unsigned char *buf;
-                       struct berval bv, nbv;
+                       int len;
                        if ( oci->no_attrs[j]->na_oi != oci )
                                continue;
-                       buf = (unsigned char *)attrs[k++];
-                       bv.bv_len = buf[0];
-                       if ( oci->no_attrs[j]->na_len > 255 ) {
-                               /* MedVar */
-                               bv.bv_len |= (buf[1] << 8);
-                               bv.bv_val = (char *)buf+2;
+                       if ( !myop[i] ) {
+                               attrs[k] = NULL;
+                       } else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
+                               void *vi = attrs[k];
+                               NdbBlob *bi = (NdbBlob *)vi;
+                               int isNull;
+                               bi->getNull( isNull );
+                               if ( !isNull ) {
+                                       nattrs++;
+                               } else {
+                                       attrs[k] = NULL;
+                               }
                        } else {
-                               bv.bv_val = (char *)buf+1;
+                               buf = (unsigned char *)attrs[k];
+                               len = buf[0];
+                               if ( oci->no_attrs[j]->na_len > 255 ) {
+                                       /* MedVar */
+                                       len |= (buf[1] << 8);
+                               }
+                               if ( len ) {
+                                       nattrs++;
+                               } else {
+                                       attrs[k] = NULL;
+                               }
                        }
-                       if ( bv.bv_len == 0 )
+                       k++;
+               }
+       }
+
+       a = attrs_alloc( nattrs+1 );
+       NA->e->e_attrs = a;
+
+       a->a_desc = slap_schema.si_ad_objectClass;
+       a->a_vals = NULL;
+       ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
+       a->a_nvals = a->a_vals;
+       a->a_numvals = myOcs.no_ntext;
+
+       BER_BVZERO( &bv[1] );
+
+       do {
+               a = NA->e->e_attrs->a_next;
+               k = 0;
+               for ( i=0; i<nocs; k=ocx[i], i++ ) {
+                       oci = oclist[i];
+                       for ( j=0; j<oci->no_nattrs; j++ ) {
+                               unsigned char *buf;
+                               struct berval nbv;
+                               if ( oci->no_attrs[j]->na_oi != oci )
+                                       continue;
+                               buf = (unsigned char *)attrs[k++];
+                               if ( !buf )
+                                       continue;
+                               if ( !myop[i] ) {
+                                       a=a->a_next;
+                                       continue;
+                               }
+                               if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
+                                       void *vi = (void *)buf;
+                                       NdbBlob *bi = (NdbBlob *)vi;
+                                       Uint64 len;
+                                       Uint32 len2;
+                                       int isNull;
+                                       bi->getNull( isNull );
+                                       if ( isNull ) {
+                                               a = a->a_next;
+                                               continue;
+                                       }
+                                       bi->getLength( len );
+                                       bv[0].bv_len = len;
+                                       bv[0].bv_val = (char *)ch_malloc( len+1 );
+                                       len2 = len;
+                                       if ( bi->readData( bv[0].bv_val, len2 )) {
+                                               Debug( LDAP_DEBUG_TRACE,
+                                                       "ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
+                                                       bi->getNdbError().message, bi->getNdbError().code, len2 );
+                                       }
+                                       bv[0].bv_val[len] = '\0';
+                                       ber_bvarray_add_x( &a->a_vals, bv, NULL );
+                               } else {
+                                       bv[0].bv_len = buf[0];
+                                       if ( oci->no_attrs[j]->na_len > 255 ) {
+                                               /* MedVar */
+                                               bv[0].bv_len |= (buf[1] << 8);
+                                               bv[0].bv_val = (char *)buf+2;
+                                               buf[1] = 0;
+                                       } else {
+                                               bv[0].bv_val = (char *)buf+1;
+                                       }
+                                       buf[0] = 0;
+                                       if ( bv[0].bv_len == 0 ) {
+                                               a = a->a_next;
+                                               continue;
+                                       }
+                                       bv[0].bv_val[bv[0].bv_len] = '\0';
+                                       value_add_one( &a->a_vals, bv );
+                               }
+                               a->a_desc = oci->no_attrs[j]->na_desc;
+                               attr_normalize_one( a->a_desc, bv, &nbv, NULL );
+                               a->a_numvals++;
+                               if ( !BER_BVISNULL( &nbv )) {
+                                       ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
+                               } else if ( !a->a_nvals ) {
+                                       a->a_nvals = a->a_vals;
+                               }
+                               a = a->a_next;
+                       }
+               }
+               k = 0;
+               for ( i=0; i<nocs; i++ ) {
+                       if ( !myop[i] )
                                continue;
-                       bv.bv_val[bv.bv_len] = '\0';
-                       a->a_desc = oci->no_attrs[j]->na_desc;
-                       attr_normalize_one( a->a_desc, &bv, &nbv, NULL );
-                       a->a_vals = NULL;
-                       a->a_nvals = NULL;
-                       a->a_numvals = 1;
-                       value_add_one( &a->a_vals, &bv );
-                       if ( !BER_BVISNULL( &nbv )) {
-                               ber_bvarray_add( &a->a_nvals, &nbv );
+                       if ((j = myop[i]->nextResult(true))) {
+                               if ( j < 0 ) {
+                                       Debug( LDAP_DEBUG_TRACE,
+                                               "ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
+                                               i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
+                               }
+                               myop[i] = NULL;
                        } else {
-                               a->a_nvals = a->a_vals;
+                               k = 1;
                        }
-                       a = a->a_next;
                }
-       }
+       } while ( k );
 
        rc = 0;
 leave:
+       if ( myop ) {
+               op->o_tmpfree( myop, op->o_tmpmemctx );
+       }
        if ( attrs ) {
-               ch_free( attrs );
+               op->o_tmpfree( attrs, op->o_tmpmemctx );
+       }
+       if ( ocx ) {
+               op->o_tmpfree( ocx, op->o_tmpmemctx );
        }
        if ( oclist ) {
-               ch_free( oclist );
+               op->o_tmpfree( oclist, op->o_tmpmemctx );
        }
 
        return rc;
@@ -804,27 +1025,32 @@ leave:
 
 static int
 ndb_oc_del( 
-       const NdbDictionary::Dictionary *myDict,
        NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
 {
-       const NdbDictionary::Table *myTable;
-       NdbOperation *myop;
+       NdbIndexScanOperation *myop;
        int i, rc;
 
        for ( i=0; i<no->no_nsets; i++ ) {
-               rc = ndb_oc_del( myDict, txn, eid, no->no_sets[i] );
-               if ( rc ) rc;
+               rc = ndb_oc_del( txn, eid, no->no_sets[i] );
+               if ( rc ) return rc;
        }
-       myTable = myDict->getTable( no->no_table.bv_val );
 
-       myop = txn->getNdbOperation( myTable );
+       myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
        if ( !myop )
                return LDAP_OTHER;
-       if ( myop->deleteTuple() )
+       if ( myop->readTuples( NdbOperation::LM_Exclusive ))
                return LDAP_OTHER;
-       if ( myop->equal( EID_COLUMN, eid ))
+       if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
                return LDAP_OTHER;
 
+       txn->execute(NoCommit);
+       while ( myop->nextResult(true) == 0) {
+               do {
+                       myop->deleteCurrentTuple();
+               } while (myop->nextResult(false) == 0);
+               txn->execute(NoCommit);
+       }
+
        return 0;
 }
 
@@ -835,9 +1061,6 @@ ndb_entry_del_data(
 )
 {
        struct ndb_info *ni = (struct ndb_info *) be->be_private;
-       const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
-       const NdbDictionary::Table *myTable;
-       NdbOperation *myop;
        Uint64 eid = NA->e->e_id;
        int i;
        NdbOcs myOcs;
@@ -846,7 +1069,7 @@ ndb_entry_del_data(
        myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
 
        for ( i=0; i<myOcs.no_ninfo; i++ ) {
-               if ( ndb_oc_del( myDict, NA->txn, eid, myOcs.no_info[i] ))
+               if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
                        return LDAP_OTHER;
        }
 
@@ -956,6 +1179,9 @@ ndb_entry_put_info(
        }
 
        /* Set list of objectClasses */
+       /* List is <sp> <class> <sp> <class> <sp> ... so that
+        * searches for " class " will yield accurate results
+        */
        if ( aoc ) {
                char *ptr, buf[sizeof(MedVar)];
                NdbOcs myOcs;
@@ -963,13 +1189,28 @@ ndb_entry_put_info(
 
                ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
                ptr = buf+2;
+               *ptr++ = ' ';
                for ( i=0; i<myOcs.no_ntext; i++ ) {
                        /* data loss... */
-                       if ( ptr + myOcs.no_text[i].bv_len >= &buf[sizeof(buf)] )
+                       if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
                                break;
-                       if ( i ) *ptr++ = ' ';
                        ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
+                       *ptr++ = ' ';
+               }
+
+               /* implicit classes */
+               if ( myOcs.no_nitext ) {
+                       *ptr++ = '@';
+                       *ptr++ = ' ';
+                       for ( i=0; i<myOcs.no_nitext; i++ ) {
+                               /* data loss... */
+                               if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
+                                       break;
+                               ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
+                               *ptr++ = ' ';
+                       }
                }
+
                i = ptr - buf - 2;
                buf[0] = i & 0xff;
                buf[1] = i >> 8;
@@ -1006,32 +1247,46 @@ extern "C" struct berval *
 ndb_str2bvarray(
        char *str,
        int len,
-       char delim
+       char delim,
+       void *ctx
 )
 {
        struct berval *list, tmp;
        char *beg;
        int i, num;
 
+       while ( *str == delim ) {
+               str++;
+               len--;
+       }
+
+       while ( str[len-1] == delim ) {
+               str[--len] = '\0';
+       }
+
        for ( i = 1, beg = str;; i++ ) {
                beg = strchr( beg, delim );
                if ( !beg )
                        break;
+               if ( beg >= str + len )
+                       break;
                beg++;
        }
 
        num = i;
-       list = (struct berval *)ch_malloc( (num+1)*sizeof(struct berval));
+       list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
 
        for ( i = 0, beg = str; i<num; i++ ) {
                tmp.bv_val = beg;
                beg = strchr( beg, delim );
+               if ( beg >= str + len )
+                       beg = NULL;
                if ( beg ) {
                        tmp.bv_len = beg - tmp.bv_val;
                } else {
                        tmp.bv_len = len - (tmp.bv_val - str);
                }
-               ber_dupbv( &list[i], &tmp );
+               ber_dupbv_x( &list[i], &tmp, ctx );
                beg++;
        }
 
@@ -1041,12 +1296,23 @@ ndb_str2bvarray(
 
 extern "C" struct berval *
 ndb_ref2oclist(
-       const char *ref
+       const char *ref,
+       void *ctx
 )
 {
+       char *implied;
+
        /* MedVar */
        int len = ref[0] | (ref[1] << 8);
-       return ndb_str2bvarray( (char *)ref+2, len, ' ' );
+
+       /* don't return the implied classes */
+       implied = (char *)memchr( ref+2, '@', len );
+       if ( implied ) {
+               len = implied - ref - 2;
+               *implied = '\0';
+       }
+
+       return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
 }
 
 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
@@ -1054,19 +1320,19 @@ ndb_ref2oclist(
  */
 extern "C" int
 ndb_entry_get_info(
-       BackendDB *be,
+       Operation *op,
        NdbArgs *NA,
        int update,
        struct berval *matched
 )
 {
-       struct ndb_info *ni = (struct ndb_info *) be->be_private;
+       struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
        const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
        const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
-       NdbOperation *myop;
-       NdbRecAttr *attr1, *attr2;
-       char idbuf[2*sizeof(ID)];
-       char ocbuf[NDB_OC_BUFLEN];
+       NdbOperation *myop[NDB_MAX_RDNS];
+       NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
+       char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
+       char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
 
        if ( matched ) {
                BER_BVZERO( matched );
@@ -1075,12 +1341,12 @@ ndb_entry_get_info(
                return LDAP_OTHER;
        }
 
-       myop = NA->txn->getNdbOperation( myTable );
-       if ( !myop ) {
+       myop[0] = NA->txn->getNdbOperation( myTable );
+       if ( !myop[0] ) {
                return LDAP_OTHER;
        }
 
-       if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
+       if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
                return LDAP_OTHER;
        }
 
@@ -1088,20 +1354,20 @@ ndb_entry_get_info(
                return LDAP_NO_SUCH_OBJECT;
        }
 
-       if ( ndb_rdns2keys( myop, NA->rdns )) {
+       if ( ndb_rdns2keys( myop[0], NA->rdns )) {
                return LDAP_OTHER;
        }
 
-       attr1 = myop->getValue( EID_COLUMN, idbuf );
-       if ( !attr1 ) {
+       eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
+       if ( !eid[0] ) {
                return LDAP_OTHER;
        }
 
-       ocbuf[0] = 0;
-       ocbuf[1] = 0;
+       ocbuf[0][0] = 0;
+       ocbuf[0][1] = 0;
        if ( !NA->ocs ) {
-               attr2 = myop->getValue( OCS_COLUMN, ocbuf );
-               if ( !attr2 ) {
+               oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
+               if ( !oc[0] ) {
                        return LDAP_OTHER;
                }
        }
@@ -1110,21 +1376,20 @@ ndb_entry_get_info(
                return LDAP_OTHER;
        }
 
-       switch( myop->getNdbError().code ) {
+       switch( myop[0]->getNdbError().code ) {
        case 0:
-               if ( !attr1->isNULL() && ( NA->e->e_id = attr1->u_64_value() )) {
+               if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
                        /* If we didn't care about OCs, or we got them */
-                       if ( NA->ocs || ocbuf[0] || ocbuf[1] ) {
+                       if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
                                /* If wanted, return them */
                                if ( !NA->ocs )
-                                       NA->ocs = ndb_ref2oclist( ocbuf );
+                                       NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
                                break;
                        }
                }
                /* FALLTHRU */
        case NDB_NO_SUCH_OBJECT:        /* no such tuple: look for closest parent */
                if ( matched ) {
-                       NdbOperation *ops[NDB_MAX_RDNS];
                        int i, j, k;
                        char dummy[2] = {0,0};
 
@@ -1132,30 +1397,48 @@ ndb_entry_get_info(
                        k = NA->rdns->nr_num - 1;
 
                        for ( i=0; i<k; i++ ) {
-                               ops[i] = NA->txn->getNdbOperation( myTable );
-                               if ( !ops[i] )
+                               myop[i] = NA->txn->getNdbOperation( myTable );
+                               if ( !myop[i] )
                                        return LDAP_OTHER;
-                               if ( ops[i]->readTuple( NdbOperation::LM_CommittedRead ))
+                               if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
                                        return LDAP_OTHER;
                                for ( j=0; j<=i; j++ ) {
-                                       if ( ops[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
+                                       if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
                                                return LDAP_OTHER;
                                }
                                for ( ;j<NDB_MAX_RDNS; j++ ) {
-                                       if ( ops[i]->equal( j+RDN_COLUMN, dummy ))
+                                       if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
+                                               return LDAP_OTHER;
+                               }
+                               eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
+                               if ( !eid[i] ) {
+                                       return LDAP_OTHER;
+                               }
+                               ocbuf[i][0] = 0;
+                               ocbuf[i][1] = 0;
+                               if ( !NA->ocs ) {
+                                       oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
+                                       if ( !oc[i] ) {
                                                return LDAP_OTHER;
+                                       }
                                }
                        }
                        if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
                                return LDAP_OTHER;
                        }
                        for ( --i; i>=0; i-- ) {
-                               if ( ops[i]->getNdbError().code == 0 ) {
+                               if ( myop[i]->getNdbError().code == 0 ) {
                                        for ( j=0; j<=i; j++ )
                                                matched->bv_len += NA->rdns->nr_buf[j][0];
+                                       NA->erdns = NA->rdns->nr_num;
+                                       NA->rdns->nr_num = j;
                                        matched->bv_len += i;
                                        matched->bv_val = NA->e->e_name.bv_val +
                                                NA->e->e_name.bv_len - matched->bv_len;
+                                       if ( !eid[i]->isNULL() )
+                                               NA->e->e_id = eid[i]->u_64_value();
+                                       if ( !NA->ocs )
+                                               NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
                                        break;
                                }
                        }
@@ -1188,8 +1471,6 @@ ndb_entry_del_info(
        if ( ndb_rdns2keys( myop, NA->rdns ))
                return LDAP_OTHER;
 
-       /* Let caller invoke the roundtrip */
-       /* return txn->execute(NoCommit); */
        return 0;
 }
 
@@ -1298,12 +1579,12 @@ ndb_entry_get(
                rdns.nr_num = 0;
                NA.ocs = NULL;
                NA.rdns = &rdns;
-               rc = ndb_entry_get_info( op->o_bd, &NA, rw, NULL );
+               rc = ndb_entry_get_info( op, &NA, rw, NULL );
        }
        if ( rc == 0 ) {
                e.e_name = *ndn;
                e.e_nname = *ndn;
-               rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
+               rc = ndb_entry_get_data( op, &NA, 0 );
                ber_bvarray_free( NA.ocs );
                if ( rc == 0 ) {
                        if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
@@ -1354,3 +1635,43 @@ ndb_trans_backoff( int num_retries )
        timeout.tv_usec = delay % 1000000;
        select( 0, NULL, NULL, NULL, &timeout );
 }
+
+extern "C" void
+ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
+{
+       struct berval dn, ndn;
+       int i, dif;
+       dif = NA->erdns - NA->rdns->nr_num;
+
+       /* Set full DN of matched into entry */
+       for ( i=0; i<dif; i++ ) {
+               dnParent( &NA->e->e_name, &dn );
+               dnParent( &NA->e->e_nname, &ndn );
+               NA->e->e_name = dn;
+               NA->e->e_nname = ndn;
+       }
+
+       /* return referral only if "disclose" is granted on the object */
+       if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
+               NULL, ACL_DISCLOSE, NULL )) {
+               Attribute a;
+               for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
+               a.a_numvals = i;
+               a.a_desc = slap_schema.si_ad_objectClass;
+               a.a_vals = NA->ocs;
+               a.a_nvals = NA->ocs;
+               a.a_next = NULL;
+               NA->e->e_attrs = &a;
+               if ( is_entry_referral( NA->e )) {
+                       NA->e->e_attrs = NULL;
+                       ndb_entry_get_data( op, NA, 0 );
+                       rs->sr_ref = get_entry_referrals( op, NA->e );
+                       if ( rs->sr_ref ) {
+                               rs->sr_err = LDAP_REFERRAL;
+                               rs->sr_flags |= REP_REF_MUSTBEFREED;
+                       }
+                       attrs_free( NA->e->e_attrs );
+               }
+               NA->e->e_attrs = NULL;
+       }
+}