]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ndb/ndbio.cpp
MySQL NDB Cluster backend (experimental)
[openldap] / servers / slapd / back-ndb / ndbio.cpp
1 /* ndbio.cpp - get/set/del data for NDB */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2008 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16 /* ACKNOWLEDGEMENTS:
17  * This work was initially developed by Howard Chu for inclusion
18  * in OpenLDAP Software. This work was sponsored by MySQL.
19  */
20
21 #include "portable.h"
22
23 #include <stdio.h>
24 #include <ac/string.h>
25 #include <ac/errno.h>
26 #include <lutil.h>
27
28 #include "back-ndb.h"
29
30 /* For reference only */
31 typedef struct MedVar {
32         Int16 len;      /* length is always little-endian */
33         char buf[1024];
34 } MedVar;
35
36 extern "C" {
37         static int ndb_name_cmp( const void *v1, const void *v2 );
38         static int ndb_oc_dup_err( void *v1, void *v2 );
39 };
40
41 static int
42 ndb_name_cmp( const void *v1, const void *v2 )
43 {
44         NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45         return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
46 }
47
48 static int
49 ndb_oc_dup_err( void *v1, void *v2 )
50 {
51         NdbOcInfo *oc = (NdbOcInfo *)v2;
52
53         oc->no_oc = (ObjectClass *)v1;
54         return -1;
55 }
56
57 /* Find an existing NdbAttrInfo */
58 extern "C" NdbAttrInfo *
59 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
60 {
61         NdbAttrInfo atmp;
62         atmp.na_name = at->sat_cname;
63
64         return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
65 }
66
67 /* Find or create an NdbAttrInfo */
68 extern "C" NdbAttrInfo *
69 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
70 {
71         NdbAttrInfo atmp, *ai;
72         atmp.na_name = *aname;
73
74         ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
75         if ( !ai ) {
76                 const char *text;
77                 AttributeDescription *ad = NULL;
78
79                 if ( slap_bv2ad( aname, &ad, &text ))
80                         return NULL;
81
82                 ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
83                 ai->na_desc = ad;
84                 ai->na_attr = ai->na_desc->ad_type;
85                 ai->na_name = ai->na_attr->sat_cname;
86                 ai->na_oi = NULL;
87                 ai->na_flag = 0;
88                 ai->na_ixcol = 0;
89                 ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90                 /* Reasonable default */
91                 if ( !ai->na_len ) {
92                         if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
93                                 ai->na_len = 1024;
94                         else
95                                 ai->na_len = 128;
96                 }
97                 /* Arbitrary limit */
98                 if ( ai->na_len > 1024 )
99                         ai->na_len = 1024;
100                 avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
101         }
102         return ai;
103 }
104
105 static int
106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
107         int create )
108 {
109         NdbAttrInfo *ai, atmp;
110         int i;
111
112         for ( i=0; attrs[i]; i++ ) {
113                 if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
114                         continue;
115                 /* skip attrs that are in a superior */
116                 if ( oci->no_oc && oci->no_oc->soc_sups ) {
117                         int j, k, found=0;
118                         ObjectClass *oc;
119                         for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120                                 oc = oci->no_oc->soc_sups[j];
121                                 if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
122                                         continue;
123                                 if ( oc->soc_required ) {
124                                         for ( k=0; oc->soc_required[k]; k++ ) {
125                                                 if ( attrs[i] == oc->soc_required[k] ) {
126                                                         found = 1;
127                                                         break;
128                                                 }
129                                         }
130                                         if ( found ) break;
131                                 }
132                                 if ( oc->soc_allowed ) {
133                                         for ( k=0; oc->soc_allowed[k]; k++ ) {
134                                                 if ( attrs[i] == oc->soc_allowed[k] ) {
135                                                         found = 1;
136                                                         break;
137                                                 }
138                                         }
139                                         if ( found ) break;
140                                 }
141                         }
142                         if ( found )
143                                 continue;
144                 }
145
146                 ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
147                 if ( !ai ) {
148                         /* can never happen */
149                         return LDAP_OTHER;
150                 }
151
152                 /* An indexed attr is defined before its OC is */
153                 if ( !ai->na_oi ) {
154                         ai->na_oi = oci;
155                         ai->na_column = (*col)++;
156                 }
157
158                 oci->no_attrs[oci->no_nattrs++] = ai;
159
160                 /* An attrset attr may already be defined */
161                 if ( ai->na_oi != oci ) {
162                         int j;
163                         for ( j=0; j<oci->no_nsets; j++ )
164                                 if ( oci->no_sets[j] == ai->na_oi ) break;
165                         if ( j >= oci->no_nsets ) {
166                                 /* FIXME: data loss if more sets are in use */
167                                 if ( oci->no_nsets < NDB_MAX_OCSETS ) {
168                                         oci->no_sets[oci->no_nsets++] = ai->na_oi;
169                                 }
170                         }
171                         continue;
172                 }
173
174                 if ( create ) {
175                         *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
176                                 ai->na_len );
177                 }
178         }
179         return 0;
180 }
181
182 static int
183 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
184 {
185         char buf[4096], *ptr;
186         int i, rc = 0, col;
187
188         if ( create ) {
189                 ptr = buf + sprintf( buf,
190                         "CREATE TABLE `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
191                         oci->no_table.bv_val );
192         }
193
194         col = 0;
195         if ( oci->no_oc->soc_required ) {
196                 for ( i=0; oci->no_oc->soc_required[i]; i++ );
197                 col += i;
198         }
199         if ( oci->no_oc->soc_allowed ) {
200                 for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
201                 col += i;
202         }
203         /* assume all are present */
204         oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
205
206         col = 1;
207         ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
208         if ( oci->no_oc->soc_required ) {
209                 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
210         }
211         if ( !rc && oci->no_oc->soc_allowed ) {
212                 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
213         }
214         ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
215
216         /* shrink down to just the needed size */
217         oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
218                 oci->no_nattrs * sizeof(struct ndb_attrinfo *));
219
220         if ( create ) {
221                 ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
222                 rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
223                 if ( rc ) {
224                         Debug( LDAP_DEBUG_ANY,
225                                 "ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
226                                 oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
227                 }
228         }
229         return rc;
230 }
231
232 /* Read table definitions from the DB and populate ObjectClassInfo */
233 extern "C" int
234 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
235 {
236         const NdbDictionary::Table *myTable;
237         const NdbDictionary::Column *myCol;
238         NdbOcInfo *oci, octmp;
239         NdbAttrInfo *ai;
240         ObjectClass *oc;
241         NdbDictionary::Dictionary::List myList;
242         struct berval bv;
243         int i, j, rc, col;
244
245         rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
246         /* Populate our objectClass structures */
247         for ( i=0; i<myList.count; i++ ) {
248                 /* Ignore other DBs */
249                 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
250                         continue;
251                 /* Ignore internal tables */
252                 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
253                         continue;
254                 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
255                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
256                 if ( oci )
257                         continue;
258
259                 oc = oc_bvfind( &octmp.no_name );
260                 if ( !oc ) {
261                         /* undefined - shouldn't happen */
262                         continue;
263                 }
264                 myTable = myDict->getTable( myList.elements[i].name );
265                 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
266                 oci->no_table.bv_val = (char *)(oci+1);
267                 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
268                 oci->no_table.bv_len = oc->soc_cname.bv_len;
269                 oci->no_name = oci->no_table;
270                 oci->no_oc = oc;
271                 oci->no_flag = 0;
272                 oci->no_nsets = 0;
273                 oci->no_nattrs = 0;
274                 col = 0;
275                 /* Make space for all attrs, even tho sups will be dropped */
276                 if ( oci->no_oc->soc_required ) {
277                         for ( j=0; oci->no_oc->soc_required[j]; j++ );
278                         col = j;
279                 }
280                 if ( oci->no_oc->soc_allowed ) {
281                         for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
282                         col += j;
283                 }
284                 oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
285                 avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
286
287                 col = myTable->getNoOfColumns();
288                 /* Skip 0, eid */
289                 for ( j = 1; j<col; j++ ) {
290                         myCol = myTable->getColumn( j );
291                         ber_str2bv( myCol->getName(), 0, 0, &bv );
292                         ai = ndb_ai_get( ni, &bv );
293                         /* shouldn't happen */
294                         if ( !ai )
295                                 continue;
296                         ai->na_oi = oci;
297                         ai->na_column = j;
298                         ai->na_len = myCol->getLength();
299                 }
300         }
301         /* Link to any attrsets */
302         for ( i=0; i<myList.count; i++ ) {
303                 /* Ignore other DBs */
304                 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
305                         continue;
306                 /* Ignore internal tables */
307                 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
308                         continue;
309                 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
310                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
311                 /* shouldn't happen */
312                 if ( !oci )
313                         continue;
314                 col = 1;
315                 if ( oci->no_oc->soc_required ) {
316                         rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
317                 }
318                 if ( oci->no_oc->soc_allowed ) {
319                         rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
320                 }
321                 /* shrink down to just the needed size */
322                 oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
323                         oci->no_nattrs * sizeof(struct ndb_attrinfo *));
324         }
325         return 0;
326 }
327
328 static int
329 ndb_oc_get( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
330         struct berval *oname, NdbOcs *out )
331 {
332         const NdbDictionary::Table *myTable;
333         NdbOcInfo *oci, octmp;
334         ObjectClass *oc;
335         int i, rc;
336
337         /* shortcut top */
338         if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
339                 octmp.no_name = *oname;
340                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
341                 if ( oci ) {
342                         oc = oci->no_oc;
343                 } else {
344                         oc = oc_bvfind( oname );
345                         if ( !oc ) {
346                                 /* undefined - shouldn't happen */
347                                 return LDAP_INVALID_SYNTAX;
348                         }
349                 }
350                 if ( oc->soc_sups ) {
351                         int i;
352
353                         for ( i=0; oc->soc_sups[i]; i++ ) {
354                                 rc = ndb_oc_get( ni, myDict, &oc->soc_sups[i]->soc_cname, out );
355                                 if ( rc ) return rc;
356                         }
357                 }
358         } else {
359                 oc = slap_schema.si_oc_top;
360         }
361         /* Only insert once */
362         for ( i=0; i<out->no_ntext; i++ )
363                 if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
364                         break;
365         if ( i == out->no_ntext )
366                 out->no_text[out->no_ntext++] = oc->soc_cname;
367
368         /* ignore top, etc... */
369         if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
370                 return 0;
371
372         if ( !oci ) {
373                 ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
374                 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
375                 oci->no_table.bv_val = (char *)(oci+1);
376                 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
377                 oci->no_table.bv_len = oc->soc_cname.bv_len;
378                 oci->no_name = oci->no_table;
379                 oci->no_oc = oc;
380                 oci->no_flag = 0;
381                 oci->no_nsets = 0;
382                 oci->no_nattrs = 0;
383                 ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
384                 if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
385                         octmp.no_oc = oci->no_oc;
386                         ch_free( oci );
387                         oci = (NdbOcInfo *)octmp.no_oc;
388                 }
389                 /* see if the oc table already exists in the DB */
390                 myTable = myDict->getTable( oci->no_table.bv_val );
391                 rc = ndb_oc_create( ni, oci, myTable == NULL );
392                 ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
393                 ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
394                 if ( rc ) return rc;
395         }
396         /* Only insert once */
397         for ( i=0; i<out->no_ninfo; i++ )
398                 if ( out->no_info[i] == oci )
399                         break;
400         if ( i == out->no_ninfo )
401                 out->no_info[out->no_ninfo++] = oci;
402         return 0;
403 }
404
405 extern "C" int
406 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
407 {
408         NdbOcInfo *oci, octmp;
409         int i, rc;
410
411         octmp.no_name = *sname;
412         oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
413         if ( oci )
414                 return LDAP_ALREADY_EXISTS;
415
416         for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
417                 if ( !at_bvfind( &attrs[i] ))
418                         return LDAP_NO_SUCH_ATTRIBUTE;
419         }
420         i++;
421
422         oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
423                 i*sizeof(AttributeType *) + sname->bv_len+1 );
424         oci->no_oc = (ObjectClass *)(oci+1);
425         oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
426         oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
427
428         for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
429                 oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
430
431         strcpy( oci->no_table.bv_val, sname->bv_val );
432         oci->no_table.bv_len = sname->bv_len;
433         oci->no_name = oci->no_table;
434         oci->no_oc->soc_cname = oci->no_name;
435         oci->no_flag = NDB_INFO_ATSET;
436
437         rc = ndb_oc_create( ni, oci, 0 );
438         if ( !rc )
439                 rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
440         if ( rc ) {
441                 ch_free( oci );
442         } else {
443                 *ret = oci;
444         }
445         return rc;
446 }
447
448 extern "C" int
449 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
450 {
451         char buf[4096], *ptr;
452         NdbAttrInfo *ai;
453         int i;
454
455         ptr = buf + sprintf( buf,
456                 "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
457                 oci->no_table.bv_val );
458
459         for ( i=0; i<oci->no_nattrs; i++ ) {
460                 if ( oci->no_attrs[i]->na_oi != oci )
461                         continue;
462                 ai = oci->no_attrs[i];
463                 ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
464                         ai->na_len );
465                 if ( ai->na_flag & NDB_INFO_INDEX ) {
466                         ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
467                 }
468         }
469         ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
470         i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
471         if ( i ) {
472                 Debug( LDAP_DEBUG_ANY,
473                         "ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
474                         oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
475         }
476         return i;
477 }
478
479 static int
480 ndb_oc_check( BackendDB *be, Ndb *ndb,
481         struct berval *ocsin, NdbOcs *out )
482 {
483         struct ndb_info *ni = (struct ndb_info *) be->be_private;
484         const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
485
486         int i, rc = 0;
487
488         out->no_ninfo = 0;
489         out->no_ntext = 0;
490
491         /* Find all objectclasses and their superiors. List
492          * the superiors first.
493          */
494
495         ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
496         for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
497                 rc = ndb_oc_get( ni, myDict, &ocsin[i], out );
498                 if ( rc ) break;
499         }
500         ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
501         return rc;
502 }
503
504 /* set all the unique attrs of this objectclass into the table
505  * max row size is 8192 bytes; how do we detect if the row is too large?
506  *
507  * FIXME: Currently only stores the first value of any multivalued attribute.
508  */
509 extern "C" int
510 ndb_oc_attrs(
511         NdbTransaction *txn,
512         const NdbDictionary::Table *myTable,
513         Entry *e,
514         NdbOcInfo *no,
515         NdbAttrInfo **attrs,
516         int nattrs,
517         int update,
518         int *num,
519         NdbOperation **retop
520 )
521 {
522         char buf[65538], *ptr;
523         Attribute *a;
524         NdbOperation *myop = retop ? *retop : NULL;
525         int i;
526
527         for ( i=0; i<nattrs; i++ ) {
528                 /* Skip if not in this table */
529                 if ( attrs[i]->na_oi != no )
530                         continue;
531                 for ( a=e->e_attrs; a; a=a->a_next ) {
532                         if ( a->a_desc->ad_type == attrs[i]->na_attr )
533                                 break;
534                 }
535                 /* If we found a match, set its value. If we found no match
536                  * and we're updating, delete its value.
537                  */
538                 if ( a || update ) {
539                         /* objectclass is in dn_idx_table */
540                         if ( a && a->a_desc == slap_schema.si_ad_objectClass )
541                                 continue;
542
543                         /* First attr, get the op, set the type and primary key */
544                         if ( !*num ) {
545                                 Uint64 eid = e->e_id;
546                                 myop = txn->getNdbOperation( myTable );
547                                 if ( !myop )
548                                         return LDAP_OTHER;
549                                 if ( update ) {
550                                         if ( myop->writeTuple())
551                                                 return LDAP_OTHER;
552                                 } else {
553                                         if ( myop->insertTuple())
554                                                 return LDAP_OTHER;
555                                 }
556                                 if ( myop->equal( EID_COLUMN, eid ))
557                                         return LDAP_OTHER;
558                         }
559                         ptr = buf;
560                         if ( a ) {
561                                 if ( a->a_vals[0].bv_len > attrs[i]->na_len ) {
562                                         Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
563                                                 attrs[i]->na_name.bv_val, 0, 0 );
564                                         return LDAP_CONSTRAINT_VIOLATION;
565                                 }
566                                 *ptr++ = a->a_vals[0].bv_len & 0xff;
567                                 if ( attrs[i]->na_len > 255 ) {
568                                         /* MedVar */
569                                         *ptr++ = a->a_vals[0].bv_len >> 8;
570                                 }
571                                 memcpy( ptr, a->a_vals[0].bv_val, a->a_vals[0].bv_len );
572                                 ptr = buf;
573                         } else {
574                                 ptr = NULL;
575                         }
576                         if ( myop->setValue( attrs[i]->na_column, ptr ))
577                                 return LDAP_OTHER;
578                         (*num)++;
579                 }
580         }
581         if ( retop )
582                 *retop = myop;
583         return LDAP_SUCCESS;
584 }
585
586 static int
587 ndb_oc_put(
588         const NdbDictionary::Dictionary *myDict,
589         NdbTransaction *txn, NdbOcInfo *no, Entry *e, int update )
590 {
591         const NdbDictionary::Table *myTable;
592         int i, rc;
593
594         for ( i=0; i<no->no_nsets; i++ ) {
595                 rc = ndb_oc_put( myDict, txn, no->no_sets[i], e, update );
596                 if ( rc )
597                         return rc;
598         }
599
600         myTable = myDict->getTable( no->no_table.bv_val );
601         if ( !myTable )
602                 return LDAP_OTHER;
603
604         i = 0;
605         return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, update, &i, NULL );
606 }
607
608 extern "C" int
609 ndb_entry_put_data(
610         BackendDB *be,
611         NdbArgs *NA,
612         int update
613 )
614 {
615         struct ndb_info *ni = (struct ndb_info *) be->be_private;
616         ObjectClass *oc;
617         Attribute *aoc, *a;
618         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
619         const NdbDictionary::Table *myTable;
620         NdbOperation *myop;
621         NdbOcs myOcs;
622         int i, rc;
623
624         /* Get the entry's objectClass attribute */
625         aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
626         if ( !aoc )
627                 return LDAP_OTHER;
628
629         ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
630         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
631
632         /* Walk thru objectclasses, find all the attributes belonging to a class */
633         for ( i=0; i<myOcs.no_ninfo; i++ ) {
634                 rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e, update );
635                 if ( rc ) return rc;
636         }
637
638         return 0;
639 }
640
641 static void
642 ndb_oc_get( NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
643 {
644         int i;
645         NdbOcInfo  **ol2;
646
647         for ( i=0; i<no->no_nsets; i++ ) {
648                 ndb_oc_get( no->no_sets[i], j, nocs, oclist );
649         }
650         if ( *j >= *nocs ) {
651                 *nocs *= 2;
652                 ol2 = (NdbOcInfo **)ch_realloc( *oclist, *nocs * sizeof(NdbOcInfo *));
653                 *oclist = ol2;
654         }
655         ol2 = *oclist;
656         ol2[(*j)++] = no;
657 }
658
659 /* Retrieve attribute data for given entry. The entry's DN and eid should
660  * already be populated.
661  */
662 extern "C" int
663 ndb_entry_get_data(
664         BackendDB *be,
665         NdbArgs *NA,
666         int update
667 )
668 {
669         struct ndb_info *ni = (struct ndb_info *) be->be_private;
670         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
671         const NdbDictionary::Table *myTable;
672         NdbOperation *myop;
673         Uint64 eid;
674
675         Attribute *aoc, *a;
676         NdbOcs myOcs;
677         NdbOcInfo *oci, **oclist = NULL;
678         char abuf[65536], *ptr, **attrs = NULL;
679
680         /* FIXME: abuf should be dynamically allocated */
681
682         int i, j, k, nocs, nattrs, rc = LDAP_OTHER, alen;
683
684         attr_merge( NA->e, slap_schema.si_ad_objectClass, NA->ocs, NULL );
685
686         eid = NA->e->e_id;
687
688         ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
689         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
690         nocs = myOcs.no_ninfo;
691
692         oclist = (NdbOcInfo **)ch_calloc( 1, nocs * sizeof(NdbOcInfo *));
693
694         for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
695                 ndb_oc_get( myOcs.no_info[i], &j, &nocs, &oclist );
696         }
697
698         nocs = j;
699         nattrs = 0;
700         for ( i=0; i<nocs; i++ )
701                 nattrs += oclist[i]->no_nattrs;
702
703         attrs = (char **)ch_malloc( nattrs * sizeof(char *));
704
705         k = 0;
706         ptr = abuf;
707         for ( i=0; i<nocs; i++ ) {
708                 oci = oclist[i];
709                 myTable = myDict->getTable( oci->no_table.bv_val );
710
711                 myop = NA->txn->getNdbOperation( myTable );
712                 if ( !myop )
713                         goto leave;
714                 if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
715                         goto leave;
716                 if ( myop->equal( EID_COLUMN, eid ))
717                         goto leave;
718
719                 for ( j=0; j<oci->no_nattrs; j++ ) {
720                         if ( oci->no_attrs[j]->na_oi != oci )
721                                 continue;
722                         attrs[k] = ptr;
723                         *ptr++ = 0;
724                         if ( oci->no_attrs[j]->na_len > 255 )
725                                 *ptr++ = 0;
726                         ptr += oci->no_attrs[j]->na_len + 1;
727                         myop->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
728                 }
729         }
730         if ( NA->txn->execute( update ? NdbTransaction::NoCommit : NdbTransaction::Commit,
731                 update ? NdbOperation::AO_IgnoreError : NdbOperation::AbortOnError, 1) < 0 )
732                 goto leave;
733
734         /* count results */
735         nattrs = 0;
736         k = 0;
737         for ( i=0; i<nocs; i++ ) {
738                 oci = oclist[i];
739                 for ( j=0; j<oci->no_nattrs; j++ ) {
740                         unsigned char *buf;
741                         int len;
742                         if ( oci->no_attrs[j]->na_oi != oci )
743                                 continue;
744                         buf = (unsigned char *)attrs[k++];
745                         len = buf[0];
746                         if ( oci->no_attrs[j]->na_len > 255 ) {
747                                 /* MedVar */
748                                 len |= (buf[1] << 8);
749                         }
750                         if ( !len )
751                                 continue;
752                         nattrs++;
753                 }
754         }
755
756         a = attrs_alloc( nattrs );
757         NA->e->e_attrs->a_next = a;
758         k = 0;
759         for ( i=0; i<nocs; i++ ) {
760                 oci = oclist[i];
761                 for ( j=0; j<oci->no_nattrs; j++ ) {
762                         unsigned char *buf;
763                         struct berval bv, nbv;
764                         if ( oci->no_attrs[j]->na_oi != oci )
765                                 continue;
766                         buf = (unsigned char *)attrs[k++];
767                         bv.bv_len = buf[0];
768                         if ( oci->no_attrs[j]->na_len > 255 ) {
769                                 /* MedVar */
770                                 bv.bv_len |= (buf[1] << 8);
771                                 bv.bv_val = (char *)buf+2;
772                         } else {
773                                 bv.bv_val = (char *)buf+1;
774                         }
775                         if ( bv.bv_len == 0 )
776                                 continue;
777                         bv.bv_val[bv.bv_len] = '\0';
778                         a->a_desc = oci->no_attrs[j]->na_desc;
779                         attr_normalize_one( a->a_desc, &bv, &nbv, NULL );
780                         a->a_vals = NULL;
781                         a->a_nvals = NULL;
782                         a->a_numvals = 1;
783                         value_add_one( &a->a_vals, &bv );
784                         if ( !BER_BVISNULL( &nbv )) {
785                                 ber_bvarray_add( &a->a_nvals, &nbv );
786                         } else {
787                                 a->a_nvals = a->a_vals;
788                         }
789                         a = a->a_next;
790                 }
791         }
792
793         rc = 0;
794 leave:
795         if ( attrs ) {
796                 ch_free( attrs );
797         }
798         if ( oclist ) {
799                 ch_free( oclist );
800         }
801
802         return rc;
803 }
804
805 static int
806 ndb_oc_del( 
807         const NdbDictionary::Dictionary *myDict,
808         NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
809 {
810         const NdbDictionary::Table *myTable;
811         NdbOperation *myop;
812         int i, rc;
813
814         for ( i=0; i<no->no_nsets; i++ ) {
815                 rc = ndb_oc_del( myDict, txn, eid, no->no_sets[i] );
816                 if ( rc ) rc;
817         }
818         myTable = myDict->getTable( no->no_table.bv_val );
819
820         myop = txn->getNdbOperation( myTable );
821         if ( !myop )
822                 return LDAP_OTHER;
823         if ( myop->deleteTuple() )
824                 return LDAP_OTHER;
825         if ( myop->equal( EID_COLUMN, eid ))
826                 return LDAP_OTHER;
827
828         return 0;
829 }
830
831 extern "C" int
832 ndb_entry_del_data(
833         BackendDB *be,
834         NdbArgs *NA
835 )
836 {
837         struct ndb_info *ni = (struct ndb_info *) be->be_private;
838         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
839         const NdbDictionary::Table *myTable;
840         NdbOperation *myop;
841         Uint64 eid = NA->e->e_id;
842         int i;
843         NdbOcs myOcs;
844
845         ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
846         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
847
848         for ( i=0; i<myOcs.no_ninfo; i++ ) {
849                 if ( ndb_oc_del( myDict, NA->txn, eid, myOcs.no_info[i] ))
850                         return LDAP_OTHER;
851         }
852
853         return 0;
854 }
855
856 extern "C" int
857 ndb_dn2rdns(
858         struct berval *dn,
859         NdbRdns *rdns
860 )
861 {
862         char *beg, *end;
863         int i, len;
864
865         /* Walk thru RDNs */
866         end = dn->bv_val + dn->bv_len;
867         for ( i=0; i<NDB_MAX_RDNS; i++ ) {
868                 for ( beg = end-1; beg > dn->bv_val; beg-- ) {
869                         if (*beg == ',') {
870                                 beg++;
871                                 break;
872                         }
873                 }
874                 if ( beg >= dn->bv_val ) {
875                         len = end - beg;
876                         /* RDN is too long */
877                         if ( len > NDB_RDN_LEN )
878                                 return LDAP_CONSTRAINT_VIOLATION;
879                         memcpy( rdns->nr_buf[i]+1, beg, len );
880                 } else {
881                         break;
882                 }
883                 rdns->nr_buf[i][0] = len;
884                 end = beg - 1;
885         }
886         /* Too many RDNs in DN */
887         if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
888                         return LDAP_CONSTRAINT_VIOLATION;
889         }
890         rdns->nr_num = i;
891         return 0;
892 }
893
894 static int
895 ndb_rdns2keys(
896         NdbOperation *myop,
897         NdbRdns *rdns
898 )
899 {
900         int i;
901         char dummy[2] = {0,0};
902
903         /* Walk thru RDNs */
904         for ( i=0; i<rdns->nr_num; i++ ) {
905                 if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
906                         return LDAP_OTHER;
907         }
908         for ( ; i<NDB_MAX_RDNS; i++ ) {
909                 if ( myop->equal( i+RDN_COLUMN, dummy ))
910                         return LDAP_OTHER;
911         }
912         return 0;
913 }
914
915 /* Store the DN2ID_TABLE fields */
916 extern "C" int
917 ndb_entry_put_info(
918         BackendDB *be,
919         NdbArgs *NA,
920         int update
921 )
922 {
923         struct ndb_info *ni = (struct ndb_info *) be->be_private;
924         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
925         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
926         NdbOperation *myop;
927         NdbAttrInfo *ai;
928         Attribute *aoc, *a;
929
930         /* Get the entry's objectClass attribute; it's ok to be
931          * absent on a fresh insert
932          */
933         aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
934         if ( update && !aoc )
935                 return LDAP_OBJECT_CLASS_VIOLATION;
936
937         myop = NA->txn->getNdbOperation( myTable );
938         if ( !myop )
939                 return LDAP_OTHER;
940         if ( update ) {
941                 if ( myop->updateTuple())
942                         return LDAP_OTHER;
943         } else {
944                 if ( myop->insertTuple())
945                         return LDAP_OTHER;
946         }
947
948         if ( ndb_rdns2keys( myop, NA->rdns ))
949                 return LDAP_OTHER;
950
951         /* Set entry ID */
952         {
953                 Uint64 eid = NA->e->e_id;
954                 if ( myop->setValue( EID_COLUMN, eid ))
955                         return LDAP_OTHER;
956         }
957
958         /* Set list of objectClasses */
959         if ( aoc ) {
960                 char *ptr, buf[sizeof(MedVar)];
961                 NdbOcs myOcs;
962                 int i;
963
964                 ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
965                 ptr = buf+2;
966                 for ( i=0; i<myOcs.no_ntext; i++ ) {
967                         /* data loss... */
968                         if ( ptr + myOcs.no_text[i].bv_len >= &buf[sizeof(buf)] )
969                                 break;
970                         if ( i ) *ptr++ = ' ';
971                         ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
972                 }
973                 i = ptr - buf - 2;
974                 buf[0] = i & 0xff;
975                 buf[1] = i >> 8;
976                 if ( myop->setValue( OCS_COLUMN, buf ))
977                         return LDAP_OTHER;
978         }
979
980         /* Set any indexed attrs */
981         for ( a = NA->e->e_attrs; a; a=a->a_next ) {
982                 ai = ndb_ai_find( ni, a->a_desc->ad_type );
983                 if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
984                         char *ptr, buf[sizeof(MedVar)];
985                         int len;
986
987                         ptr = buf+1;
988                         len = a->a_vals[0].bv_len;
989                         /* FIXME: data loss */
990                         if ( len > ai->na_len )
991                                 len = ai->na_len;
992                         buf[0] = len & 0xff;
993                         if ( ai->na_len > 255 ) {
994                                 *ptr++ = len >> 8;
995                         }
996                         memcpy( ptr, a->a_vals[0].bv_val, len );
997                         if ( myop->setValue( ai->na_ixcol, buf ))
998                                 return LDAP_OTHER;
999                 }
1000         }
1001
1002         return 0;
1003 }
1004
1005 extern "C" struct berval *
1006 ndb_str2bvarray(
1007         char *str,
1008         int len,
1009         char delim
1010 )
1011 {
1012         struct berval *list, tmp;
1013         char *beg;
1014         int i, num;
1015
1016         for ( i = 1, beg = str;; i++ ) {
1017                 beg = strchr( beg, delim );
1018                 if ( !beg )
1019                         break;
1020                 beg++;
1021         }
1022
1023         num = i;
1024         list = (struct berval *)ch_malloc( (num+1)*sizeof(struct berval));
1025
1026         for ( i = 0, beg = str; i<num; i++ ) {
1027                 tmp.bv_val = beg;
1028                 beg = strchr( beg, delim );
1029                 if ( beg ) {
1030                         tmp.bv_len = beg - tmp.bv_val;
1031                 } else {
1032                         tmp.bv_len = len - (tmp.bv_val - str);
1033                 }
1034                 ber_dupbv( &list[i], &tmp );
1035                 beg++;
1036         }
1037
1038         BER_BVZERO( &list[i] );
1039         return list;
1040 }
1041
1042 extern "C" struct berval *
1043 ndb_ref2oclist(
1044         const char *ref
1045 )
1046 {
1047         /* MedVar */
1048         int len = ref[0] | (ref[1] << 8);
1049         return ndb_str2bvarray( (char *)ref+2, len, ' ' );
1050 }
1051
1052 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1053  * the existence of a DN.
1054  */
1055 extern "C" int
1056 ndb_entry_get_info(
1057         BackendDB *be,
1058         NdbArgs *NA,
1059         int update,
1060         struct berval *matched
1061 )
1062 {
1063         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1065         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1066         NdbOperation *myop;
1067         NdbRecAttr *attr1, *attr2;
1068         char idbuf[2*sizeof(ID)];
1069         char ocbuf[NDB_OC_BUFLEN];
1070
1071         if ( matched ) {
1072                 BER_BVZERO( matched );
1073         }
1074         if ( !myTable ) {
1075                 return LDAP_OTHER;
1076         }
1077
1078         myop = NA->txn->getNdbOperation( myTable );
1079         if ( !myop ) {
1080                 return LDAP_OTHER;
1081         }
1082
1083         if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1084                 return LDAP_OTHER;
1085         }
1086
1087         if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1088                 return LDAP_NO_SUCH_OBJECT;
1089         }
1090
1091         if ( ndb_rdns2keys( myop, NA->rdns )) {
1092                 return LDAP_OTHER;
1093         }
1094
1095         attr1 = myop->getValue( EID_COLUMN, idbuf );
1096         if ( !attr1 ) {
1097                 return LDAP_OTHER;
1098         }
1099
1100         ocbuf[0] = 0;
1101         ocbuf[1] = 0;
1102         if ( !NA->ocs ) {
1103                 attr2 = myop->getValue( OCS_COLUMN, ocbuf );
1104                 if ( !attr2 ) {
1105                         return LDAP_OTHER;
1106                 }
1107         }
1108
1109         if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1110                 return LDAP_OTHER;
1111         }
1112
1113         switch( myop->getNdbError().code ) {
1114         case 0:
1115                 if ( !attr1->isNULL() && ( NA->e->e_id = attr1->u_64_value() )) {
1116                         /* If we didn't care about OCs, or we got them */
1117                         if ( NA->ocs || ocbuf[0] || ocbuf[1] ) {
1118                                 /* If wanted, return them */
1119                                 if ( !NA->ocs )
1120                                         NA->ocs = ndb_ref2oclist( ocbuf );
1121                                 break;
1122                         }
1123                 }
1124                 /* FALLTHRU */
1125         case NDB_NO_SUCH_OBJECT:        /* no such tuple: look for closest parent */
1126                 if ( matched ) {
1127                         NdbOperation *ops[NDB_MAX_RDNS];
1128                         int i, j, k;
1129                         char dummy[2] = {0,0};
1130
1131                         /* get to last RDN, then back up 1 */
1132                         k = NA->rdns->nr_num - 1;
1133
1134                         for ( i=0; i<k; i++ ) {
1135                                 ops[i] = NA->txn->getNdbOperation( myTable );
1136                                 if ( !ops[i] )
1137                                         return LDAP_OTHER;
1138                                 if ( ops[i]->readTuple( NdbOperation::LM_CommittedRead ))
1139                                         return LDAP_OTHER;
1140                                 for ( j=0; j<=i; j++ ) {
1141                                         if ( ops[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1142                                                 return LDAP_OTHER;
1143                                 }
1144                                 for ( ;j<NDB_MAX_RDNS; j++ ) {
1145                                         if ( ops[i]->equal( j+RDN_COLUMN, dummy ))
1146                                                 return LDAP_OTHER;
1147                                 }
1148                         }
1149                         if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1150                                 return LDAP_OTHER;
1151                         }
1152                         for ( --i; i>=0; i-- ) {
1153                                 if ( ops[i]->getNdbError().code == 0 ) {
1154                                         for ( j=0; j<=i; j++ )
1155                                                 matched->bv_len += NA->rdns->nr_buf[j][0];
1156                                         matched->bv_len += i;
1157                                         matched->bv_val = NA->e->e_name.bv_val +
1158                                                 NA->e->e_name.bv_len - matched->bv_len;
1159                                         break;
1160                                 }
1161                         }
1162                 }
1163                 return LDAP_NO_SUCH_OBJECT;
1164         default:
1165                 return LDAP_OTHER;
1166         }
1167
1168         return 0;
1169 }
1170
1171 extern "C" int
1172 ndb_entry_del_info(
1173         BackendDB *be,
1174         NdbArgs *NA
1175 )
1176 {
1177         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1178         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1179         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1180         NdbOperation *myop;
1181
1182         myop = NA->txn->getNdbOperation( myTable );
1183         if ( !myop )
1184                 return LDAP_OTHER;
1185         if ( myop->deleteTuple())
1186                 return LDAP_OTHER;
1187
1188         if ( ndb_rdns2keys( myop, NA->rdns ))
1189                 return LDAP_OTHER;
1190
1191         /* Let caller invoke the roundtrip */
1192         /* return txn->execute(NoCommit); */
1193         return 0;
1194 }
1195
1196 extern "C" int
1197 ndb_next_id(
1198         BackendDB *be,
1199         Ndb *ndb,
1200         ID *id
1201 )
1202 {
1203         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1204         const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1205         const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1206         Uint64 nid = 0;
1207         int rc;
1208
1209         if ( !myTable ) {
1210                 Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1211                         0, 0, 0 );
1212                 return LDAP_OTHER;
1213         }
1214
1215         rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1216         if ( !rc )
1217                 *id = nid;
1218         return rc;
1219 }
1220
1221 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1222 static void
1223 ndb_thread_hfree( void *key, void *data )
1224 {
1225         Ndb *ndb = (Ndb *)data;
1226         delete ndb;
1227 }
1228
1229 extern "C" int
1230 ndb_thread_handle(
1231         Operation *op,
1232         Ndb **ndb )
1233 {
1234         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1235         void *data;
1236
1237         if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1238                 Ndb *myNdb;
1239                 int rc;
1240                 ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1241                 myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1242                 if ( ni->ni_nextconn >= ni->ni_nconns )
1243                         ni->ni_nextconn = 0;
1244                 ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1245                 if ( !myNdb ) {
1246                         return LDAP_OTHER;
1247                 }
1248                 rc = myNdb->init(1024);
1249                 if ( rc ) {
1250                         delete myNdb;
1251                         Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1252                                 rc, 0, 0 );
1253                         return rc;
1254                 }
1255                 data = (void *)myNdb;
1256                 if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1257                         data, ndb_thread_hfree, NULL, NULL ))) {
1258                         delete myNdb;
1259                         Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1260                                 rc, 0, 0 );
1261                         return rc;
1262                 }
1263         }
1264         *ndb = (Ndb *)data;
1265         return 0;
1266 }
1267
1268 extern "C" int
1269 ndb_entry_get(
1270         Operation *op,
1271         struct berval *ndn,
1272         ObjectClass *oc,
1273         AttributeDescription *ad,
1274         int rw,
1275         Entry **ent )
1276 {
1277         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1278         NdbArgs NA;
1279         Entry e = {0};
1280         int rc;
1281
1282         /* Get our NDB handle */
1283         rc = ndb_thread_handle( op, &NA.ndb );
1284
1285         NA.txn = NA.ndb->startTransaction();
1286         if( !NA.txn ) {
1287                 Debug( LDAP_DEBUG_TRACE,
1288                         LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1289                         NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1290                 return 1;
1291         }
1292
1293         e.e_name = *ndn;
1294         NA.e = &e;
1295         /* get entry */
1296         {
1297                 NdbRdns rdns;
1298                 rdns.nr_num = 0;
1299                 NA.ocs = NULL;
1300                 NA.rdns = &rdns;
1301                 rc = ndb_entry_get_info( op->o_bd, &NA, rw, NULL );
1302         }
1303         if ( rc == 0 ) {
1304                 e.e_name = *ndn;
1305                 e.e_nname = *ndn;
1306                 rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
1307                 ber_bvarray_free( NA.ocs );
1308                 if ( rc == 0 ) {
1309                         if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1310                                 attrs_free( e.e_attrs );
1311                                 rc = 1;
1312                         }
1313                 }
1314         }
1315         if ( rc == 0 ) {
1316                 *ent = entry_alloc();
1317                 **ent = e;
1318                 ber_dupbv( &(*ent)->e_name, ndn );
1319                 ber_dupbv( &(*ent)->e_nname, ndn );
1320         } else {
1321                 rc = 1;
1322         }
1323         NA.txn->close();
1324         return rc;
1325 }
1326
1327 /* Congestion avoidance code
1328  * for Deadlock Rollback
1329  */
1330
1331 extern "C" void
1332 ndb_trans_backoff( int num_retries )
1333 {
1334         int i;
1335         int delay = 0;
1336         int pow_retries = 1;
1337         unsigned long key = 0;
1338         unsigned long max_key = -1;
1339         struct timeval timeout;
1340
1341         lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1342
1343         for ( i = 0; i < num_retries; i++ ) {
1344                 if ( i >= 5 ) break;
1345                 pow_retries *= 4;
1346         }
1347
1348         delay = 16384 * (key * (double) pow_retries / (double) max_key);
1349         delay = delay ? delay : 1;
1350
1351         Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1352
1353         timeout.tv_sec = delay / 1000000;
1354         timeout.tv_usec = delay % 1000000;
1355         select( 0, NULL, NULL, NULL, &timeout );
1356 }