]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ndb/search.cpp
MySQL NDB Cluster backend (experimental)
[openldap] / servers / slapd / back-ndb / search.cpp
1 /* search.cpp - tools for slap tools */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2008 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16 /* ACKNOWLEDGEMENTS:
17  * This work was initially developed by Howard Chu for inclusion
18  * in OpenLDAP Software. This work was sponsored by MySQL.
19  */
20
21 #include "portable.h"
22
23 #include <stdio.h>
24 #include <ac/string.h>
25 #include <ac/errno.h>
26
27 #include "lutil.h"
28
29 #include "back-ndb.h"
30
31 static int
32 ndb_dn2bound(
33         NdbIndexScanOperation *myop,
34         NdbRdns *rdns
35 )
36 {
37         unsigned int i;
38
39         /* Walk thru RDNs */
40         for ( i=0; i<rdns->nr_num; i++ ) {
41                 /* Note: RDN_COLUMN offset not needed here */
42                 if ( myop->setBound( i, NdbIndexScanOperation::BoundEQ, rdns->nr_buf[i] ))
43                         return LDAP_OTHER;
44         }
45         return i;
46 }
47
48 /* Check that all filter terms reside in the same table.
49  *
50  * If any of the filter terms are indexed, then only an IndexScan of the OL_index
51  * will be performed. If none are indexed, but all the terms reside in a single
52  * table, a Scan can be performed with the LDAP filter transformed into a ScanFilter.
53  *
54  * Otherwise, a full scan of the DB must be done with all filtering done by slapd.
55  */
56 static int ndb_filter_check( struct ndb_info *ni, Filter *f,
57         NdbOcInfo **oci, int *indexed, int *ocfilter )
58 {
59         AttributeDescription *ad = NULL;
60         ber_tag_t choice = f->f_choice;
61         int rc = 0, undef = 0;
62
63         if ( choice & SLAPD_FILTER_UNDEFINED ) {
64                 choice &= SLAPD_FILTER_MASK;
65                 undef = 1;
66         }
67         switch( choice ) {
68         case LDAP_FILTER_AND:
69         case LDAP_FILTER_OR:
70         case LDAP_FILTER_NOT:
71                 for ( f = f->f_list; f; f=f->f_next ) {
72                         rc = ndb_filter_check( ni, f, oci, indexed, ocfilter );
73                         if ( rc ) return rc;
74                 }
75                 break;
76         case LDAP_FILTER_PRESENT:
77                 ad = f->f_desc;
78                 break;
79         case LDAP_FILTER_EQUALITY:
80         case LDAP_FILTER_SUBSTRINGS:
81         case LDAP_FILTER_GE:
82         case LDAP_FILTER_LE:
83         case LDAP_FILTER_APPROX:
84                 ad = f->f_av_desc;
85                 break;
86         default:
87                 break;
88         }
89         if ( ad && !undef ) {
90                 NdbAttrInfo *ai;
91                 /* ObjectClass filtering is in dn2id table */
92                 if ( ad == slap_schema.si_ad_objectClass ) {
93                         if ( choice == LDAP_FILTER_EQUALITY )
94                                 (*ocfilter)++;
95                         return 0;
96                 }
97                 ai = ndb_ai_find( ni, ad->ad_type );
98                 if ( ai ) {
99                         if ( ai->na_flag & NDB_INFO_INDEX )
100                                 (*indexed)++;
101                         if ( *oci ) {
102                                 if ( ai->na_oi != *oci )
103                                         rc = -1;
104                         } else {
105                                 *oci = ai->na_oi;
106                         }
107                 }
108         }
109         return rc;
110 }
111
112 static int ndb_filter_set( Operation *op, struct ndb_info *ni, Filter *f, int indexed,
113         NdbIndexScanOperation *scan, NdbScanFilter **sf, int *bounds )
114 {
115         AttributeDescription *ad = NULL;
116         ber_tag_t choice = f->f_choice;
117         int undef = 0;
118
119         if ( choice & SLAPD_FILTER_UNDEFINED ) {
120                 choice &= SLAPD_FILTER_MASK;
121                 undef = 1;
122         }
123         switch( choice ) {
124         case LDAP_FILTER_NOT:
125                 /* no indexing for these */
126                 break;
127         case LDAP_FILTER_OR:
128                 /* FIXME: these bounds aren't right. */
129                 if ( indexed ) {
130                         scan->end_of_bound( (*bounds)++ );
131                 }
132         case LDAP_FILTER_AND:
133                 for ( f = f->f_list; f; f=f->f_next ) {
134                         if ( ndb_filter_set( op, ni, f, indexed, scan, sf, bounds ))
135                                 return -1;
136                 }
137                 break;
138         case LDAP_FILTER_PRESENT:
139                 ad = f->f_desc;
140                 break;
141         case LDAP_FILTER_EQUALITY:
142         case LDAP_FILTER_SUBSTRINGS:
143         case LDAP_FILTER_GE:
144         case LDAP_FILTER_LE:
145         case LDAP_FILTER_APPROX:
146                 ad = f->f_av_desc;
147                 break;
148         default:
149                 break;
150         }
151         if ( ad && !undef ) {
152                 NdbAttrInfo *ai;
153                 /* ObjectClass filtering is in dn2id table */
154                 if ( ad == slap_schema.si_ad_objectClass ) {
155                         return 0;
156                 }
157                 ai = ndb_ai_find( ni, ad->ad_type );
158                 if ( ai ) {
159                         if ( ai->na_flag & NDB_INFO_INDEX ) {
160                                 char *buf, *ptr;
161                                 NdbIndexScanOperation::BoundType bt;
162                                 int rc;
163
164                                 switch(choice) {
165                                 case LDAP_FILTER_PRESENT:
166                                         rc = scan->setBound( ai->na_ixcol - IDX_COLUMN,
167                                                 NdbIndexScanOperation::BoundGT, NULL );
168                                         break;
169                                 case LDAP_FILTER_EQUALITY:
170                                 case LDAP_FILTER_APPROX:
171                                         bt = NdbIndexScanOperation::BoundEQ;
172                                         goto setit;
173                                 case LDAP_FILTER_GE:
174                                         bt = NdbIndexScanOperation::BoundGE;
175                                         goto setit;
176                                 case LDAP_FILTER_LE:
177                                         bt = NdbIndexScanOperation::BoundLE;
178                                 setit:
179                                         rc = f->f_av_value.bv_len+1;
180                                         if ( ai->na_len > 255 )
181                                                 rc++;
182                                         buf = (char *)op->o_tmpalloc( rc, op->o_tmpmemctx );
183                                         rc = f->f_av_value.bv_len;
184                                         buf[0] = rc & 0xff;
185                                         ptr = buf+1;
186                                         if ( ai->na_len > 255 ) {
187                                                 buf[1] = (rc >> 8);
188                                                 ptr++;
189                                         }
190                                         memcpy( ptr, f->f_av_value.bv_val, f->f_av_value.bv_len );
191                                         rc = scan->setBound( ai->na_ixcol - IDX_COLUMN, bt, buf );
192                                         op->o_tmpfree( buf, op->o_tmpmemctx );
193                                         break;
194                                 default:
195                                         break;
196                                 }
197                         } else {
198                         }
199                 }
200         }
201         return 0;
202 }
203
204 static int ndb_oc_search( Operation *op, SlapReply *rs, Ndb *ndb, NdbTransaction *txn,
205         NdbRdns *rbase, NdbOcInfo *oci, int indexed )
206 {
207         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
208         const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
209         const NdbDictionary::Table *myTable;
210         const NdbDictionary::Index *myIndex;
211         NdbIndexScanOperation *scan;
212         NdbIndexOperation *ixop;
213         NdbScanFilter *sf = NULL;
214         struct berval *ocs, matched;
215         NdbRecAttr *scanID, *scanOC, *scanDN[NDB_MAX_RDNS];
216         char dnBuf[2048], *ptr;
217         NdbRdns rdns;
218         NdbArgs NA;
219         char idbuf[2*sizeof(ID)];
220         char ocbuf[NDB_OC_BUFLEN];
221         int i, rc, bounds;
222         Entry e = {0};
223         Uint64 eid;
224
225         myTable = myDict->getTable( oci->no_table.bv_val );
226         if ( indexed ) { 
227                 scan = txn->getNdbIndexScanOperation( INDEX_NAME, DN2ID_TABLE );
228                 if ( !scan )
229                         return LDAP_OTHER;
230         } else {
231                 myIndex = myDict->getIndex( "eid$unique", DN2ID_TABLE );
232                 if ( !myIndex ) {
233                         Debug( LDAP_DEBUG_ANY, DN2ID_TABLE " eid index is missing!\n", 0, 0, 0 );
234                         rs->sr_err = LDAP_OTHER;
235                         goto leave;
236                 }
237                 scan = (NdbIndexScanOperation *)txn->getNdbScanOperation( myTable );
238                 if ( !scan )
239                         return LDAP_OTHER;
240                 sf = new NdbScanFilter(scan);
241                 if ( !sf )
242                         return LDAP_OTHER;
243                 switch ( op->ors_filter->f_choice ) {
244                 case LDAP_FILTER_AND:
245                 case LDAP_FILTER_OR:
246                 case LDAP_FILTER_NOT:
247                         break;
248                 default:
249                         if ( sf->begin() < 0 ) {
250                                 rc = LDAP_OTHER;
251                                 goto leave;
252                         }
253                 }
254         }
255         scan->readTuples( NdbOperation::LM_CommittedRead );
256
257         bounds = 0;
258         rc = ndb_filter_set( op, ni, op->ors_filter, indexed, scan, &sf, &bounds );
259         if ( rc )
260                 goto leave;
261         
262         scanID = scan->getValue( EID_COLUMN, idbuf );
263         if ( indexed ) {
264                 scanOC = scan->getValue( OCS_COLUMN, ocbuf );
265                 for ( i=0; i<NDB_MAX_RDNS; i++ ) {
266                         rdns.nr_buf[i][0] = '\0';
267                         scanDN[i] = scan->getValue( RDN_COLUMN+i, rdns.nr_buf[i] );
268                 }
269         }
270
271         if ( txn->execute( NdbTransaction::NoCommit, NdbOperation::AbortOnError, 1 )) {
272                 rs->sr_err = LDAP_OTHER;
273                 goto leave;
274         }
275
276         e.e_name.bv_val = dnBuf;
277         NA.e = &e;
278         NA.ndb = ndb;
279         while ( scan->nextResult() == 0 ) {
280                 NdbTransaction *tx2;
281                 if ( op->o_abandon )
282                         break;
283                 eid = scanID->u_64_value();
284                 e.e_id = eid;
285                 if ( !indexed ) {
286                         tx2 = ndb->startTransaction( myTable );
287                         if ( !tx2 ) {
288                                 rs->sr_err = LDAP_OTHER;
289                                 goto leave;
290                         }
291
292                         ixop = tx2->getNdbIndexOperation( myIndex );
293                         if ( !ixop ) {
294                                 tx2->close();
295                                 rs->sr_err = LDAP_OTHER;
296                                 goto leave;
297                         }
298                         ixop->readTuple( NdbOperation::LM_CommittedRead );
299                         ixop->equal( EID_COLUMN, eid );
300
301                         scanOC = ixop->getValue( OCS_COLUMN, ocbuf );
302                         for ( i=0; i<NDB_MAX_RDNS; i++ ) {
303                                 rdns.nr_buf[i][0] = '\0';
304                                 scanDN[i] = ixop->getValue( RDN_COLUMN+i, rdns.nr_buf[i] );
305                         }
306                         rc = tx2->execute( NdbTransaction::Commit, NdbOperation::AbortOnError, 1 );
307                         tx2->close();
308                         if ( rc ) {
309                                 rs->sr_err = LDAP_OTHER;
310                                 goto leave;
311                         }
312                 }
313
314                 ocs = ndb_ref2oclist( ocbuf );
315                 for ( i=0; i<NDB_MAX_RDNS; i++ ) {
316                         if ( scanDN[i]->isNULL() || !rdns.nr_buf[i][0] )
317                                 break;
318                 }
319                 rdns.nr_num = i;
320
321                 /* entry must be subordinate to the base */
322                 if ( i < rbase->nr_num ) {
323                         continue;
324                 }
325
326                 ptr = dnBuf;
327                 for ( --i; i>=0; i-- ) {
328                         char *buf;
329                         int len;
330                         buf = rdns.nr_buf[i];
331                         len = *buf++;
332                         ptr = lutil_strncopy( ptr, buf, len );
333                         if ( i ) *ptr++ = ',';
334                 }
335                 *ptr = '\0';
336                 e.e_name.bv_len = ptr - dnBuf;
337
338                 /* More scope checks */
339                 /* If indexed, these can be moved into the ScanFilter */
340                 switch( op->ors_scope ) {
341                 case LDAP_SCOPE_ONELEVEL:
342                         if ( rdns.nr_num != rbase->nr_num+1 )
343                                 continue;
344                 case LDAP_SCOPE_SUBORDINATE:
345                         if ( rdns.nr_num == rbase->nr_num )
346                                 continue;
347                 case LDAP_SCOPE_SUBTREE:
348                 default:
349                         if ( e.e_name.bv_len <= op->o_req_dn.bv_len ) {
350                                 if ( op->ors_scope != LDAP_SCOPE_SUBTREE ||
351                                         strcasecmp( op->o_req_dn.bv_val, e.e_name.bv_val ))
352                                         continue;
353                         } else if ( strcasecmp( op->o_req_dn.bv_val, e.e_name.bv_val +
354                                 e.e_name.bv_len - op->o_req_dn.bv_len ))
355                                 continue;
356                 }
357
358                 dnNormalize( 0, NULL, NULL, &e.e_name, &e.e_nname, op->o_tmpmemctx );
359                 {
360 #ifdef notdef           /* NDBapi is broken here */
361                         Ndb::Key_part_ptr keys[2];
362                         char xbuf[32];
363                         keys[0].ptr = &eid;
364                         keys[0].len = sizeof(eid);
365                         keys[1].ptr = NULL;
366                         keys[1].len = 0;
367                         tx2 = ndb->startTransaction( myTable, keys, xbuf, sizeof(xbuf));
368 #else
369                         tx2 = ndb->startTransaction( myTable );
370 #endif
371                         if ( !tx2 ) {
372                                 rs->sr_err = LDAP_OTHER;
373                                 goto leave;
374                         }
375                         NA.txn = tx2;
376                         NA.ocs = ocs;
377                         rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
378                         tx2->close();
379                 }
380                 ber_bvarray_free( ocs );
381                 rc = test_filter( op, &e, op->ors_filter );
382                 if ( rc == LDAP_COMPARE_TRUE ) {
383                         rs->sr_entry = &e;
384                         rs->sr_attrs = op->ors_attrs;
385                         rs->sr_flags = 0;
386                         rc = send_search_entry( op, rs );
387                         rs->sr_entry = NULL;
388                 } else {
389                         rc = 0;
390                 }
391                 attrs_free( e.e_attrs );
392                 e.e_attrs = NULL;
393                 op->o_tmpfree( e.e_nname.bv_val, op->o_tmpmemctx );
394                 if ( rc ) break;
395         }
396 leave:
397         if ( sf ) delete sf;
398         return rc;
399 }
400
401 extern "C"
402 int ndb_back_search( Operation *op, SlapReply *rs )
403 {
404         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
405         NdbTransaction *txn, *tx2;
406         NdbIndexScanOperation *scan;
407         NdbScanFilter *sf = NULL;
408         Entry e = {0};
409         int rc, i, ocfilter, indexed;
410         struct berval matched;
411         NdbRecAttr *scanID, *scanOC, *scanDN[NDB_MAX_RDNS];
412         char dnBuf[2048], *ptr;
413         char idbuf[2*sizeof(ID)];
414         char ocbuf[NDB_OC_BUFLEN];
415         NdbRdns rdns;
416         NdbOcInfo *oci;
417         NdbArgs NA;
418
419         rc = ndb_thread_handle( op, &NA.ndb );
420         rdns.nr_num = 0;
421
422         txn = NA.ndb->startTransaction();
423         if ( !txn ) {
424                 Debug( LDAP_DEBUG_TRACE,
425                         LDAP_XSTRING(ndb_back_search) ": startTransaction failed: %s (%d)\n",
426                         NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
427                 rs->sr_err = LDAP_OTHER;
428                 rs->sr_text = "internal error";
429                 goto leave;
430         }
431
432         NA.txn = txn;
433         e.e_name = op->o_req_dn;
434         NA.e = &e;
435         NA.rdns = &rdns;
436         NA.ocs = op->ors_scope == LDAP_SCOPE_BASE ? NULL : &matched;
437
438         rs->sr_err = ndb_entry_get_info( op->o_bd, &NA, 0, &matched );
439         if ( rs->sr_err ) {
440                 if ( rs->sr_err == LDAP_NO_SUCH_OBJECT )
441                         rs->sr_matched = matched.bv_val;
442                 goto leave;
443         }
444
445         if ( op->ors_scope == LDAP_SCOPE_BASE ) {
446                 e.e_name = op->o_req_dn;
447                 e.e_nname = op->o_req_ndn;
448                 rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
449                 ber_bvarray_free( NA.ocs );
450                 rc = test_filter( op, &e, op->ors_filter );
451                 if ( rc == LDAP_COMPARE_TRUE ) {
452                         rs->sr_entry = &e;
453                         rs->sr_attrs = op->ors_attrs;
454                         rs->sr_flags = 0;
455                         send_search_entry( op, rs );
456                         rs->sr_entry = NULL;
457                 }
458                 attrs_free( e.e_attrs );
459                 e.e_attrs = NULL;
460                 rs->sr_err = LDAP_SUCCESS;
461                 goto leave;
462         } else if ( rdns.nr_num == NDB_MAX_RDNS ) {
463                 if ( op->ors_scope == LDAP_SCOPE_ONELEVEL ||
464                         op->ors_scope == LDAP_SCOPE_CHILDREN )
465                 rs->sr_err = LDAP_SUCCESS;
466                 goto leave;
467         }
468
469         /* See if we can handle the filter. Filtering on objectClass is only done
470          * in the DN2ID table scan. If all other filter terms reside in one table,
471          * then we scan the OC table instead of the DN2ID table.
472          */
473         oci = NULL;
474         indexed = 0;
475         ocfilter = 0;
476         rc = ndb_filter_check( ni, op->ors_filter, &oci, &indexed, &ocfilter );
477         if ( rc ) {
478                 Debug( LDAP_DEBUG_TRACE, "ndb_back_search: "
479                         "filter attributes from multiple tables, indexing ignored\n",
480                         0, 0, 0 );
481         } else if ( oci ) {
482                 rc = ndb_oc_search( op, rs, NA.ndb, txn, &rdns, oci, indexed );
483                 goto leave;
484         }
485
486         scan = txn->getNdbIndexScanOperation( "PRIMARY", DN2ID_TABLE );
487         scan->readTuples( NdbOperation::LM_CommittedRead );
488         rc = ndb_dn2bound( scan, &rdns );
489
490         switch( op->ors_scope ) {
491         case LDAP_SCOPE_ONELEVEL:
492                 sf = new NdbScanFilter(scan);
493                 if ( sf->begin() < 0 ||
494                         sf->cmp(NdbScanFilter::COND_NOT_LIKE, rc+3, "_%",
495                                 STRLENOF("_%")) < 0 ||
496                         sf->end() < 0 ) {
497                         rs->sr_err = LDAP_OTHER;
498                         goto leave;
499                 }
500                 /* FALLTHRU */
501         case LDAP_SCOPE_CHILDREN:
502                 /* Note: RDN_COLUMN offset not needed here */
503                 scan->setBound( rc, NdbIndexScanOperation::BoundLT, "\0" );
504                 /* FALLTHRU */
505         case LDAP_SCOPE_SUBTREE:
506                 break;
507         }
508         scanID = scan->getValue( EID_COLUMN, idbuf );
509         scanOC = scan->getValue( OCS_COLUMN, ocbuf );
510         for ( i=0; i<NDB_MAX_RDNS; i++ ) {
511                 rdns.nr_buf[i][0] = '\0';
512                 scanDN[i] = scan->getValue( RDN_COLUMN+i, rdns.nr_buf[i] );
513         }
514         if ( txn->execute( NdbTransaction::NoCommit, NdbOperation::AbortOnError, 1 )) {
515                 rs->sr_err = LDAP_OTHER;
516                 goto leave;
517         }
518
519         e.e_name.bv_val = dnBuf;
520         while ( scan->nextResult() == 0 ) {
521                 if ( op->o_abandon )
522                         break;
523                 e.e_id = scanID->u_64_value();
524                 NA.ocs = ndb_ref2oclist( ocbuf );
525                 for ( i=0; i<NDB_MAX_RDNS; i++ ) {
526                         if ( scanDN[i]->isNULL() || !rdns.nr_buf[i][0] )
527                                 break;
528                 }
529                 ptr = dnBuf;
530                 rdns.nr_num = i;
531                 for ( --i; i>=0; i-- ) {
532                         char *buf;
533                         int len;
534                         buf = rdns.nr_buf[i];
535                         len = *buf++;
536                         ptr = lutil_strncopy( ptr, buf, len );
537                         if ( i ) *ptr++ = ',';
538                 }
539                 *ptr = '\0';
540                 e.e_name.bv_len = ptr - dnBuf;
541                 dnNormalize( 0, NULL, NULL, &e.e_name, &e.e_nname, op->o_tmpmemctx );
542                 NA.txn = NA.ndb->startTransaction();
543                 rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
544                 NA.txn->close();
545                 ber_bvarray_free( NA.ocs );
546                 rc = test_filter( op, &e, op->ors_filter );
547                 if ( rc == LDAP_COMPARE_TRUE ) {
548                         rs->sr_entry = &e;
549                         rs->sr_attrs = op->ors_attrs;
550                         rs->sr_flags = 0;
551                         rc = send_search_entry( op, rs );
552                         rs->sr_entry = NULL;
553                 } else {
554                         rc = 0;
555                 }
556                 attrs_free( e.e_attrs );
557                 e.e_attrs = NULL;
558                 op->o_tmpfree( e.e_nname.bv_val, op->o_tmpmemctx );
559                 if ( rc ) break;
560         }
561 leave:
562         if ( sf )
563                 delete sf;
564         txn->close();
565         send_ldap_result( op, rs );
566         return rs->sr_err;
567 }
568
569 extern "C" int
570 ndb_has_children(
571         NdbArgs *NA,
572         int *hasChildren
573 )
574 {
575         NdbIndexScanOperation *scan;
576         char idbuf[2*sizeof(ID)];
577         int rc;
578
579         if ( NA->rdns->nr_num >= NDB_MAX_RDNS ) {
580                 *hasChildren = LDAP_COMPARE_FALSE;
581                 return 0;
582         }
583                 
584         scan = NA->txn->getNdbIndexScanOperation( "PRIMARY", DN2ID_TABLE );
585         if ( !scan )
586                 return LDAP_OTHER;
587         scan->readTuples( NdbOperation::LM_Read, 0U, 0U, 1U );
588         rc = ndb_dn2bound( scan, NA->rdns );
589         if ( rc < NDB_MAX_RDNS ) {
590                 scan->setBound( rc, NdbIndexScanOperation::BoundLT, "\0" );
591         }
592         scan->interpret_exit_last_row();
593         scan->getValue( EID_COLUMN, idbuf );
594         if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1 )) {
595                 return LDAP_OTHER;
596         }
597         if (rc < NDB_MAX_RDNS && scan->nextResult() == 0 )
598                 *hasChildren = LDAP_COMPARE_TRUE;
599         else
600                 *hasChildren = LDAP_COMPARE_FALSE;
601         return 0;
602 }
603
604 extern "C" int
605 ndb_has_subordinates(
606         Operation *op,
607         Entry *e,
608         int *hasSubordinates )
609 {
610         NdbArgs NA;
611         NdbRdns rdns;
612         int rc;
613
614         NA.rdns = &rdns;
615         rc = ndb_dn2rdns( &e->e_nname, &rdns );
616
617         if ( rc == 0 ) {
618                 rc = ndb_thread_handle( op, &NA.ndb );
619                 NA.txn = NA.ndb->startTransaction();
620                 if ( NA.txn ) {
621                         rc = ndb_has_children( &NA, hasSubordinates );
622                         NA.txn->close();
623                 }
624         }
625
626         return rc;
627 }
628
629 /*
630  * sets the supported operational attributes (if required)
631  */
632 extern "C" int
633 ndb_operational(
634         Operation       *op,
635         SlapReply       *rs )
636 {
637         Attribute       **ap;
638
639         assert( rs->sr_entry != NULL );
640
641         for ( ap = &rs->sr_operational_attrs; *ap; ap = &(*ap)->a_next )
642                 /* just count */ ;
643
644         if ( SLAP_OPATTRS( rs->sr_attr_flags ) ||
645                         ad_inlist( slap_schema.si_ad_hasSubordinates, rs->sr_attrs ) )
646         {
647                 int     hasSubordinates, rc;
648
649                 rc = ndb_has_subordinates( op, rs->sr_entry, &hasSubordinates );
650                 if ( rc == LDAP_SUCCESS ) {
651                         *ap = slap_operational_hasSubordinate( hasSubordinates == LDAP_COMPARE_TRUE );
652                         assert( *ap != NULL );
653
654                         ap = &(*ap)->a_next;
655                 }
656         }
657
658         return LDAP_SUCCESS;
659 }
660