]> git.sur5r.net Git - openldap/blob - servers/slapd/back-ndb/ndbio.cpp
Happy New Year
[openldap] / servers / slapd / back-ndb / ndbio.cpp
1 /* ndbio.cpp - get/set/del data for NDB */
2 /* $OpenLDAP$ */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
4  *
5  * Copyright 2008-2018 The OpenLDAP Foundation.
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted only as authorized by the OpenLDAP
10  * Public License.
11  *
12  * A copy of this license is available in the file LICENSE in the
13  * top-level directory of the distribution or, alternatively, at
14  * <http://www.OpenLDAP.org/license.html>.
15  */
16 /* ACKNOWLEDGEMENTS:
17  * This work was initially developed by Howard Chu for inclusion
18  * in OpenLDAP Software. This work was sponsored by MySQL.
19  */
20
21 #include "portable.h"
22
23 #include <stdio.h>
24 #include <ac/string.h>
25 #include <ac/errno.h>
26 #include <lutil.h>
27
28 #include "back-ndb.h"
29
30 /* For reference only */
31 typedef struct MedVar {
32         Int16 len;      /* length is always little-endian */
33         char buf[1024];
34 } MedVar;
35
36 extern "C" {
37         static int ndb_name_cmp( const void *v1, const void *v2 );
38         static int ndb_oc_dup_err( void *v1, void *v2 );
39 };
40
41 static int
42 ndb_name_cmp( const void *v1, const void *v2 )
43 {
44         NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45         return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
46 }
47
48 static int
49 ndb_oc_dup_err( void *v1, void *v2 )
50 {
51         NdbOcInfo *oc = (NdbOcInfo *)v2;
52
53         oc->no_oc = (ObjectClass *)v1;
54         return -1;
55 }
56
57 /* Find an existing NdbAttrInfo */
58 extern "C" NdbAttrInfo *
59 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
60 {
61         NdbAttrInfo atmp;
62         atmp.na_name = at->sat_cname;
63
64         return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
65 }
66
67 /* Find or create an NdbAttrInfo */
68 extern "C" NdbAttrInfo *
69 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
70 {
71         NdbAttrInfo atmp, *ai;
72         atmp.na_name = *aname;
73
74         ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
75         if ( !ai ) {
76                 const char *text;
77                 AttributeDescription *ad = NULL;
78
79                 if ( slap_bv2ad( aname, &ad, &text ))
80                         return NULL;
81
82                 ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
83                 ai->na_desc = ad;
84                 ai->na_attr = ai->na_desc->ad_type;
85                 ai->na_name = ai->na_attr->sat_cname;
86                 ai->na_oi = NULL;
87                 ai->na_flag = 0;
88                 ai->na_ixcol = 0;
89                 ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90                 /* Reasonable default */
91                 if ( !ai->na_len ) {
92                         if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
93                                 ai->na_len = 1024;
94                         else
95                                 ai->na_len = 128;
96                 }
97                 /* Arbitrary limit */
98                 if ( ai->na_len > 1024 )
99                         ai->na_len = 1024;
100                 avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
101         }
102         return ai;
103 }
104
105 static int
106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
107         int create )
108 {
109         NdbAttrInfo *ai;
110         int i;
111
112         for ( i=0; attrs[i]; i++ ) {
113                 if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
114                         continue;
115                 /* skip attrs that are in a superior */
116                 if ( oci->no_oc && oci->no_oc->soc_sups ) {
117                         int j, k, found=0;
118                         ObjectClass *oc;
119                         for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120                                 oc = oci->no_oc->soc_sups[j];
121                                 if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
122                                         continue;
123                                 if ( oc->soc_required ) {
124                                         for ( k=0; oc->soc_required[k]; k++ ) {
125                                                 if ( attrs[i] == oc->soc_required[k] ) {
126                                                         found = 1;
127                                                         break;
128                                                 }
129                                         }
130                                         if ( found ) break;
131                                 }
132                                 if ( oc->soc_allowed ) {
133                                         for ( k=0; oc->soc_allowed[k]; k++ ) {
134                                                 if ( attrs[i] == oc->soc_allowed[k] ) {
135                                                         found = 1;
136                                                         break;
137                                                 }
138                                         }
139                                         if ( found ) break;
140                                 }
141                         }
142                         if ( found )
143                                 continue;
144                 }
145
146                 ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
147                 if ( !ai ) {
148                         /* can never happen */
149                         return LDAP_OTHER;
150                 }
151
152                 /* An attrset may have already been connected */
153                 if (( oci->no_flag & NDB_INFO_ATSET ) && ai->na_oi == oci )
154                         continue;
155
156                 /* An indexed attr is defined before its OC is */
157                 if ( !ai->na_oi ) {
158                         ai->na_oi = oci;
159                         ai->na_column = (*col)++;
160                 }
161
162                 oci->no_attrs[oci->no_nattrs++] = ai;
163
164                 /* An attrset attr may already be defined */
165                 if ( ai->na_oi != oci ) {
166                         int j;
167                         for ( j=0; j<oci->no_nsets; j++ )
168                                 if ( oci->no_sets[j] == ai->na_oi ) break;
169                         if ( j >= oci->no_nsets ) {
170                                 /* FIXME: data loss if more sets are in use */
171                                 if ( oci->no_nsets < NDB_MAX_OCSETS ) {
172                                         oci->no_sets[oci->no_nsets++] = ai->na_oi;
173                                 }
174                         }
175                         continue;
176                 }
177
178                 if ( create ) {
179                         if ( ai->na_flag & NDB_INFO_ATBLOB ) {
180                                 *ptr += sprintf( *ptr, ", `%s` BLOB", ai->na_attr->sat_cname.bv_val );
181                         } else {
182                                 *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
183                                         ai->na_len );
184                         }
185                 }
186         }
187         return 0;
188 }
189
190 static int
191 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
192 {
193         char buf[4096], *ptr;
194         int i, rc = 0, col;
195
196         if ( create ) {
197                 ptr = buf + sprintf( buf,
198                         "CREATE TABLE `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
199                         oci->no_table.bv_val );
200         }
201
202         col = 0;
203         if ( oci->no_oc->soc_required ) {
204                 for ( i=0; oci->no_oc->soc_required[i]; i++ );
205                 col += i;
206         }
207         if ( oci->no_oc->soc_allowed ) {
208                 for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
209                 col += i;
210         }
211         /* assume all are present */
212         oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
213
214         col = 2;
215         ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
216         if ( oci->no_oc->soc_required ) {
217                 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
218         }
219         if ( !rc && oci->no_oc->soc_allowed ) {
220                 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
221         }
222         ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
223
224         /* shrink down to just the needed size */
225         oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
226                 oci->no_nattrs * sizeof(struct ndb_attrinfo *));
227
228         if ( create ) {
229                 ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
230                 rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
231                 if ( rc ) {
232                         Debug( LDAP_DEBUG_ANY,
233                                 "ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
234                                 oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
235                 }
236         }
237         return rc;
238 }
239
240 /* Read table definitions from the DB and populate ObjectClassInfo */
241 extern "C" int
242 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
243 {
244         const NdbDictionary::Table *myTable;
245         const NdbDictionary::Column *myCol;
246         NdbOcInfo *oci, octmp;
247         NdbAttrInfo *ai;
248         ObjectClass *oc;
249         NdbDictionary::Dictionary::List myList;
250         struct berval bv;
251         int i, j, rc, col;
252
253         rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
254         /* Populate our objectClass structures */
255         for ( i=0; i<myList.count; i++ ) {
256                 /* Ignore other DBs */
257                 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
258                         continue;
259                 /* Ignore internal tables */
260                 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
261                         continue;
262                 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
263                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
264                 if ( oci )
265                         continue;
266
267                 oc = oc_bvfind( &octmp.no_name );
268                 if ( !oc ) {
269                         /* undefined - shouldn't happen */
270                         continue;
271                 }
272                 myTable = myDict->getTable( myList.elements[i].name );
273                 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
274                 oci->no_table.bv_val = (char *)(oci+1);
275                 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
276                 oci->no_table.bv_len = oc->soc_cname.bv_len;
277                 oci->no_name = oci->no_table;
278                 oci->no_oc = oc;
279                 oci->no_flag = 0;
280                 oci->no_nsets = 0;
281                 oci->no_nattrs = 0;
282                 col = 0;
283                 /* Make space for all attrs, even tho sups will be dropped */
284                 if ( oci->no_oc->soc_required ) {
285                         for ( j=0; oci->no_oc->soc_required[j]; j++ );
286                         col = j;
287                 }
288                 if ( oci->no_oc->soc_allowed ) {
289                         for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
290                         col += j;
291                 }
292                 oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
293                 avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
294
295                 col = myTable->getNoOfColumns();
296                 /* Skip 0 and 1, eid and vid */
297                 for ( j = 2; j<col; j++ ) {
298                         myCol = myTable->getColumn( j );
299                         ber_str2bv( myCol->getName(), 0, 0, &bv );
300                         ai = ndb_ai_get( ni, &bv );
301                         /* shouldn't happen */
302                         if ( !ai )
303                                 continue;
304                         ai->na_oi = oci;
305                         ai->na_column = j;
306                         ai->na_len = myCol->getLength();
307                         if ( myCol->getType() == NdbDictionary::Column::Blob )
308                                 ai->na_flag |= NDB_INFO_ATBLOB;
309                 }
310         }
311         /* Link to any attrsets */
312         for ( i=0; i<myList.count; i++ ) {
313                 /* Ignore other DBs */
314                 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
315                         continue;
316                 /* Ignore internal tables */
317                 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
318                         continue;
319                 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
320                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
321                 /* shouldn't happen */
322                 if ( !oci )
323                         continue;
324                 col = 2;
325                 if ( oci->no_oc->soc_required ) {
326                         rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
327                 }
328                 if ( oci->no_oc->soc_allowed ) {
329                         rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
330                 }
331                 /* shrink down to just the needed size */
332                 oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
333                         oci->no_nattrs * sizeof(struct ndb_attrinfo *));
334         }
335         return 0;
336 }
337
338 static int
339 ndb_oc_list( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
340         struct berval *oname, int implied, NdbOcs *out )
341 {
342         const NdbDictionary::Table *myTable;
343         NdbOcInfo *oci, octmp;
344         ObjectClass *oc;
345         int i, rc;
346
347         /* shortcut top */
348         if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
349                 octmp.no_name = *oname;
350                 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
351                 if ( oci ) {
352                         oc = oci->no_oc;
353                 } else {
354                         oc = oc_bvfind( oname );
355                         if ( !oc ) {
356                                 /* undefined - shouldn't happen */
357                                 return LDAP_INVALID_SYNTAX;
358                         }
359                 }
360                 if ( oc->soc_sups ) {
361                         int i;
362
363                         for ( i=0; oc->soc_sups[i]; i++ ) {
364                                 rc = ndb_oc_list( ni, myDict, &oc->soc_sups[i]->soc_cname, 1, out );
365                                 if ( rc ) return rc;
366                         }
367                 }
368         } else {
369                 oc = slap_schema.si_oc_top;
370         }
371         /* Only insert once */
372         for ( i=0; i<out->no_ntext; i++ )
373                 if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
374                         break;
375         if ( i == out->no_ntext ) {
376                 for ( i=0; i<out->no_nitext; i++ )
377                         if ( out->no_itext[i].bv_val == oc->soc_cname.bv_val )
378                                 break;
379                 if ( i == out->no_nitext ) {
380                         if ( implied )
381                                 out->no_itext[out->no_nitext++] = oc->soc_cname;
382                         else
383                                 out->no_text[out->no_ntext++] = oc->soc_cname;
384                 }
385         }
386
387         /* ignore top, etc... */
388         if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
389                 return 0;
390
391         if ( !oci ) {
392                 ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
393                 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
394                 oci->no_table.bv_val = (char *)(oci+1);
395                 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
396                 oci->no_table.bv_len = oc->soc_cname.bv_len;
397                 oci->no_name = oci->no_table;
398                 oci->no_oc = oc;
399                 oci->no_flag = 0;
400                 oci->no_nsets = 0;
401                 oci->no_nattrs = 0;
402                 ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
403                 if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
404                         octmp.no_oc = oci->no_oc;
405                         ch_free( oci );
406                         oci = (NdbOcInfo *)octmp.no_oc;
407                 }
408                 /* see if the oc table already exists in the DB */
409                 myTable = myDict->getTable( oci->no_table.bv_val );
410                 rc = ndb_oc_create( ni, oci, myTable == NULL );
411                 ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
412                 ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
413                 if ( rc ) return rc;
414         }
415         /* Only insert once */
416         for ( i=0; i<out->no_ninfo; i++ )
417                 if ( out->no_info[i] == oci )
418                         break;
419         if ( i == out->no_ninfo )
420                 out->no_info[out->no_ninfo++] = oci;
421         return 0;
422 }
423
424 extern "C" int
425 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
426 {
427         NdbOcInfo *oci, octmp;
428         int i, rc;
429
430         octmp.no_name = *sname;
431         oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
432         if ( oci )
433                 return LDAP_ALREADY_EXISTS;
434
435         for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
436                 if ( !at_bvfind( &attrs[i] ))
437                         return LDAP_NO_SUCH_ATTRIBUTE;
438         }
439         i++;
440
441         oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
442                 i*sizeof(AttributeType *) + sname->bv_len+1 );
443         oci->no_oc = (ObjectClass *)(oci+1);
444         oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
445         oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
446
447         for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
448                 oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
449
450         strcpy( oci->no_table.bv_val, sname->bv_val );
451         oci->no_table.bv_len = sname->bv_len;
452         oci->no_name = oci->no_table;
453         oci->no_oc->soc_cname = oci->no_name;
454         oci->no_flag = NDB_INFO_ATSET;
455
456         if ( !ber_bvcmp( sname, &slap_schema.si_oc_extensibleObject->soc_cname ))
457                 oci->no_oc->soc_kind = slap_schema.si_oc_extensibleObject->soc_kind;
458
459         rc = ndb_oc_create( ni, oci, 0 );
460         if ( !rc )
461                 rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
462         if ( rc ) {
463                 ch_free( oci );
464         } else {
465                 *ret = oci;
466         }
467         return rc;
468 }
469
470 extern "C" int
471 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
472 {
473         char buf[4096], *ptr;
474         NdbAttrInfo *ai;
475         int i;
476
477         ptr = buf + sprintf( buf,
478                 "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL, vid int unsigned NOT NULL",
479                 oci->no_table.bv_val );
480
481         for ( i=0; i<oci->no_nattrs; i++ ) {
482                 if ( oci->no_attrs[i]->na_oi != oci )
483                         continue;
484                 ai = oci->no_attrs[i];
485                 ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
486                         ai->na_len );
487                 if ( ai->na_flag & NDB_INFO_INDEX ) {
488                         ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
489                 }
490         }
491         ptr = lutil_strcopy( ptr, ", PRIMARY KEY(eid, vid) ) ENGINE=ndb PARTITION BY KEY(eid)" );
492         i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
493         if ( i ) {
494                 Debug( LDAP_DEBUG_ANY,
495                         "ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
496                         oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
497         }
498         return i;
499 }
500
501 static int
502 ndb_oc_check( BackendDB *be, Ndb *ndb,
503         struct berval *ocsin, NdbOcs *out )
504 {
505         struct ndb_info *ni = (struct ndb_info *) be->be_private;
506         const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
507
508         int i, rc = 0;
509
510         out->no_ninfo = 0;
511         out->no_ntext = 0;
512         out->no_nitext = 0;
513
514         /* Find all objectclasses and their superiors. List
515          * the superiors first.
516          */
517
518         ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
519         for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
520                 rc = ndb_oc_list( ni, myDict, &ocsin[i], 0, out );
521                 if ( rc ) break;
522         }
523         ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
524         return rc;
525 }
526
527 #define V_INS   1
528 #define V_DEL   2
529 #define V_REP   3
530
531 static int ndb_flush_blobs;
532
533 /* set all the unique attrs of this objectclass into the table
534  */
535 extern "C" int
536 ndb_oc_attrs(
537         NdbTransaction *txn,
538         const NdbDictionary::Table *myTable,
539         Entry *e,
540         NdbOcInfo *no,
541         NdbAttrInfo **attrs,
542         int nattrs,
543         Attribute *old
544 )
545 {
546         char buf[65538], *ptr;
547         Attribute **an, **ao, *a;
548         NdbOperation *myop;
549         int i, j, max = 0;
550         int changed, rc;
551         Uint64 eid = e->e_id;
552
553         if ( !nattrs )
554                 return 0;
555
556         an = (Attribute **)ch_malloc( 2 * nattrs * sizeof(Attribute *));
557         ao = an + nattrs;
558
559         /* Turn lists of attrs into arrays for easier access */
560         for ( i=0; i<nattrs; i++ ) {
561                 if ( attrs[i]->na_oi != no ) {
562                         an[i] = NULL;
563                         ao[i] = NULL;
564                         continue;
565                 }
566                 for ( a=e->e_attrs; a; a=a->a_next ) {
567                         if ( a->a_desc == slap_schema.si_ad_objectClass )
568                                 continue;
569                         if ( a->a_desc->ad_type == attrs[i]->na_attr ) {
570                                 /* Don't process same attr twice */
571                                 if ( a->a_flags & SLAP_ATTR_IXADD )
572                                         a = NULL;
573                                 else
574                                         a->a_flags |= SLAP_ATTR_IXADD;
575                                 break;
576                         }
577                 }
578                 an[i] = a;
579                 if ( a && a->a_numvals > max )
580                         max = a->a_numvals;
581                 for ( a=old; a; a=a->a_next ) {
582                         if ( a->a_desc == slap_schema.si_ad_objectClass )
583                                 continue;
584                         if ( a->a_desc->ad_type == attrs[i]->na_attr )
585                                 break;
586                 }
587                 ao[i] = a;
588                 if ( a && a->a_numvals > max )
589                         max = a->a_numvals;
590         }
591
592         for ( i=0; i<max; i++ ) {
593                 myop = NULL;
594                 for ( j=0; j<nattrs; j++ ) {
595                         if ( !an[j] && !ao[j] )
596                                 continue;
597                         changed = 0;
598                         if ( an[j] && an[j]->a_numvals > i ) {
599                                 /* both old and new are present, compare for changes */
600                                 if ( ao[j] && ao[j]->a_numvals > i ) {
601                                         if ( ber_bvcmp( &ao[j]->a_nvals[i], &an[j]->a_nvals[i] ))
602                                                 changed = V_REP;
603                                 } else {
604                                         changed = V_INS;
605                                 }
606                         } else {
607                                 if ( ao[j] && ao[j]->a_numvals > i )
608                                         changed = V_DEL;
609                         }
610                         if ( changed ) {
611                                 if ( !myop ) {
612                                         rc = LDAP_OTHER;
613                                         myop = txn->getNdbOperation( myTable );
614                                         if ( !myop ) {
615                                                 goto done;
616                                         }
617                                         if ( old ) {
618                                                 if ( myop->writeTuple()) {
619                                                         goto done;
620                                                 }
621                                         } else {
622                                                 if ( myop->insertTuple()) {
623                                                         goto done;
624                                                 }
625                                         }
626                                         if ( myop->equal( EID_COLUMN, eid )) {
627                                                 goto done;
628                                         }
629                                         if ( myop->equal( VID_COLUMN, i )) {
630                                                 goto done;
631                                         }
632                                 }
633                                 if ( attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
634                                         NdbBlob *myBlob = myop->getBlobHandle( attrs[j]->na_column );
635                                         rc = LDAP_OTHER;
636                                         if ( !myBlob ) {
637                                                 Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: getBlobHandle failed %s (%d)\n",
638                                                         myop->getNdbError().message, myop->getNdbError().code, 0 );
639                                                 goto done;
640                                         }
641                                         if ( slapMode & SLAP_TOOL_MODE )
642                                                 ndb_flush_blobs = 1;
643                                         if ( changed & V_INS ) {
644                                                 if ( myBlob->setValue( an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len )) {
645                                                         Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
646                                                                 myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
647                                                         goto done;
648                                                 }
649                                         } else {
650                                                 if ( myBlob->setValue( NULL, 0 )) {
651                                                         Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: blob->setValue failed %s (%d)\n",
652                                                                 myBlob->getNdbError().message, myBlob->getNdbError().code, 0 );
653                                                         goto done;
654                                                 }
655                                         }
656                                 } else {
657                                         if ( changed & V_INS ) {
658                                                 if ( an[j]->a_vals[i].bv_len > attrs[j]->na_len ) {
659                                                         Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
660                                                                 attrs[j]->na_name.bv_val, 0, 0 );
661                                                         rc = LDAP_CONSTRAINT_VIOLATION;
662                                                         goto done;
663                                                 }
664                                                 ptr = buf;
665                                                 *ptr++ = an[j]->a_vals[i].bv_len & 0xff;
666                                                 if ( attrs[j]->na_len > 255 ) {
667                                                         /* MedVar */
668                                                         *ptr++ = an[j]->a_vals[i].bv_len >> 8;
669                                                 }
670                                                 memcpy( ptr, an[j]->a_vals[i].bv_val, an[j]->a_vals[i].bv_len );
671                                                 ptr = buf;
672                                         } else {
673                                                 ptr = NULL;
674                                         }
675                                         if ( myop->setValue( attrs[j]->na_column, ptr )) {
676                                                 rc = LDAP_OTHER;
677                                                 goto done;
678                                         }
679                                 }
680                         }
681                 }
682         }
683         rc = LDAP_SUCCESS;
684 done:
685         ch_free( an );
686         if ( rc ) {
687                 Debug( LDAP_DEBUG_TRACE, "ndb_oc_attrs: failed %s (%d)\n",
688                         myop->getNdbError().message, myop->getNdbError().code, 0 );
689         }
690         return rc;
691 }
692
693 static int
694 ndb_oc_put(
695         const NdbDictionary::Dictionary *myDict,
696         NdbTransaction *txn, NdbOcInfo *no, Entry *e )
697 {
698         const NdbDictionary::Table *myTable;
699         int i, rc;
700
701         for ( i=0; i<no->no_nsets; i++ ) {
702                 rc = ndb_oc_put( myDict, txn, no->no_sets[i], e );
703                 if ( rc )
704                         return rc;
705         }
706
707         myTable = myDict->getTable( no->no_table.bv_val );
708         if ( !myTable )
709                 return LDAP_OTHER;
710
711         return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, NULL );
712 }
713
714 /* This is now only used for Adds. Modifies call ndb_oc_attrs directly. */
715 extern "C" int
716 ndb_entry_put_data(
717         BackendDB *be,
718         NdbArgs *NA
719 )
720 {
721         struct ndb_info *ni = (struct ndb_info *) be->be_private;
722         Attribute *aoc;
723         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
724         NdbOcs myOcs;
725         int i, rc;
726
727         /* Get the entry's objectClass attribute */
728         aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
729         if ( !aoc )
730                 return LDAP_OTHER;
731
732         ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
733         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
734
735         /* Walk thru objectclasses, find all the attributes belonging to a class */
736         for ( i=0; i<myOcs.no_ninfo; i++ ) {
737                 rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e );
738                 if ( rc ) return rc;
739         }
740
741         /* slapadd tries to batch multiple entries per txn, but entry data is
742          * transient and blob data is required to remain valid for the whole txn.
743          * So we need to flush blobs before their source data disappears.
744          */
745         if (( slapMode & SLAP_TOOL_MODE ) && ndb_flush_blobs )
746                 NA->txn->execute( NdbTransaction::NoCommit );
747
748         return 0;
749 }
750
751 static void
752 ndb_oc_get( Operation *op, NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
753 {
754         int i;
755         NdbOcInfo  **ol2;
756
757         for ( i=0; i<no->no_nsets; i++ ) {
758                 ndb_oc_get( op, no->no_sets[i], j, nocs, oclist );
759         }
760
761         /* Don't insert twice */
762         ol2 = *oclist;
763         for ( i=0; i<*j; i++ )
764                 if ( ol2[i] == no )
765                         return;
766
767         if ( *j >= *nocs ) {
768                 *nocs *= 2;
769                 ol2 = (NdbOcInfo **)op->o_tmprealloc( *oclist, *nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
770                 *oclist = ol2;
771         }
772         ol2 = *oclist;
773         ol2[(*j)++] = no;
774 }
775
776 /* Retrieve attribute data for given entry. The entry's DN and eid should
777  * already be populated.
778  */
779 extern "C" int
780 ndb_entry_get_data(
781         Operation *op,
782         NdbArgs *NA,
783         int update
784 )
785 {
786         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
787         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
788         const NdbDictionary::Table *myTable;
789         NdbIndexScanOperation **myop = NULL;
790         Uint64 eid;
791
792         Attribute *a;
793         NdbOcs myOcs;
794         NdbOcInfo *oci, **oclist = NULL;
795         char abuf[65536], *ptr, **attrs = NULL;
796         struct berval bv[2];
797         int *ocx = NULL;
798
799         /* FIXME: abuf should be dynamically allocated */
800
801         int i, j, k, nocs, nattrs, rc = LDAP_OTHER;
802
803         eid = NA->e->e_id;
804
805         ndb_oc_check( op->o_bd, NA->ndb, NA->ocs, &myOcs );
806         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
807         nocs = myOcs.no_ninfo;
808
809         oclist = (NdbOcInfo **)op->o_tmpcalloc( 1, nocs * sizeof(NdbOcInfo *), op->o_tmpmemctx );
810
811         for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
812                 ndb_oc_get( op, myOcs.no_info[i], &j, &nocs, &oclist );
813         }
814
815         nocs = j;
816         nattrs = 0;
817         for ( i=0; i<nocs; i++ )
818                 nattrs += oclist[i]->no_nattrs;
819
820         ocx = (int *)op->o_tmpalloc( nocs * sizeof(int), op->o_tmpmemctx );
821
822         attrs = (char **)op->o_tmpalloc( nattrs * sizeof(char *), op->o_tmpmemctx );
823
824         myop = (NdbIndexScanOperation **)op->o_tmpalloc( nattrs * sizeof(NdbIndexScanOperation *), op->o_tmpmemctx );
825
826         k = 0;
827         ptr = abuf;
828         for ( i=0; i<nocs; i++ ) {
829                 oci = oclist[i];
830
831                 myop[i] = NA->txn->getNdbIndexScanOperation( "PRIMARY", oci->no_table.bv_val );
832                 if ( !myop[i] )
833                         goto leave;
834                 if ( myop[i]->readTuples( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
835                         goto leave;
836                 if ( myop[i]->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
837                         goto leave;
838
839                 for ( j=0; j<oci->no_nattrs; j++ ) {
840                         if ( oci->no_attrs[j]->na_oi != oci )
841                                 continue;
842                         if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
843                                 NdbBlob *bi = myop[i]->getBlobHandle( oci->no_attrs[j]->na_column );
844                                 attrs[k++] = (char *)bi;
845                         } else {
846                                 attrs[k] = ptr;
847                                 *ptr++ = 0;
848                                 if ( oci->no_attrs[j]->na_len > 255 )
849                                         *ptr++ = 0;
850                                 ptr += oci->no_attrs[j]->na_len + 1;
851                                 myop[i]->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
852                         }
853                 }
854                 ocx[i] = k;
855         }
856         /* Must use IgnoreError, because an entry with multiple objectClasses may not
857          * actually have attributes defined in each class / table.
858          */
859         if ( NA->txn->execute( NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 )
860                 goto leave;
861
862         /* count results */
863         for ( i=0; i<nocs; i++ ) {
864                 if (( j = myop[i]->nextResult(true) )) {
865                         if ( j < 0 ) {
866                                 Debug( LDAP_DEBUG_TRACE,
867                                         "ndb_entry_get_data: first nextResult(%d) failed: %s (%d)\n",
868                                         i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
869                         }
870                         myop[i] = NULL;
871                 }
872         }
873
874         nattrs = 0;
875         k = 0;
876         for ( i=0; i<nocs; i++ ) {
877                 oci = oclist[i];
878                 for ( j=0; j<oci->no_nattrs; j++ ) {
879                         unsigned char *buf;
880                         int len;
881                         if ( oci->no_attrs[j]->na_oi != oci )
882                                 continue;
883                         if ( !myop[i] ) {
884                                 attrs[k] = NULL;
885                         } else if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
886                                 void *vi = attrs[k];
887                                 NdbBlob *bi = (NdbBlob *)vi;
888                                 int isNull;
889                                 bi->getNull( isNull );
890                                 if ( !isNull ) {
891                                         nattrs++;
892                                 } else {
893                                         attrs[k] = NULL;
894                                 }
895                         } else {
896                                 buf = (unsigned char *)attrs[k];
897                                 len = buf[0];
898                                 if ( oci->no_attrs[j]->na_len > 255 ) {
899                                         /* MedVar */
900                                         len |= (buf[1] << 8);
901                                 }
902                                 if ( len ) {
903                                         nattrs++;
904                                 } else {
905                                         attrs[k] = NULL;
906                                 }
907                         }
908                         k++;
909                 }
910         }
911
912         a = attrs_alloc( nattrs+1 );
913         NA->e->e_attrs = a;
914
915         a->a_desc = slap_schema.si_ad_objectClass;
916         a->a_vals = NULL;
917         ber_bvarray_dup_x( &a->a_vals, NA->ocs, NULL );
918         a->a_nvals = a->a_vals;
919         a->a_numvals = myOcs.no_ntext;
920
921         BER_BVZERO( &bv[1] );
922
923         do {
924                 a = NA->e->e_attrs->a_next;
925                 k = 0;
926                 for ( i=0; i<nocs; k=ocx[i], i++ ) {
927                         oci = oclist[i];
928                         for ( j=0; j<oci->no_nattrs; j++ ) {
929                                 unsigned char *buf;
930                                 struct berval nbv;
931                                 if ( oci->no_attrs[j]->na_oi != oci )
932                                         continue;
933                                 buf = (unsigned char *)attrs[k++];
934                                 if ( !buf )
935                                         continue;
936                                 if ( !myop[i] ) {
937                                         a=a->a_next;
938                                         continue;
939                                 }
940                                 if ( oci->no_attrs[j]->na_flag & NDB_INFO_ATBLOB ) {
941                                         void *vi = (void *)buf;
942                                         NdbBlob *bi = (NdbBlob *)vi;
943                                         Uint64 len;
944                                         Uint32 len2;
945                                         int isNull;
946                                         bi->getNull( isNull );
947                                         if ( isNull ) {
948                                                 a = a->a_next;
949                                                 continue;
950                                         }
951                                         bi->getLength( len );
952                                         bv[0].bv_len = len;
953                                         bv[0].bv_val = (char *)ch_malloc( len+1 );
954                                         len2 = len;
955                                         if ( bi->readData( bv[0].bv_val, len2 )) {
956                                                 Debug( LDAP_DEBUG_TRACE,
957                                                         "ndb_entry_get_data: blob readData failed: %s (%d), len %d\n",
958                                                         bi->getNdbError().message, bi->getNdbError().code, len2 );
959                                         }
960                                         bv[0].bv_val[len] = '\0';
961                                         ber_bvarray_add_x( &a->a_vals, bv, NULL );
962                                 } else {
963                                         bv[0].bv_len = buf[0];
964                                         if ( oci->no_attrs[j]->na_len > 255 ) {
965                                                 /* MedVar */
966                                                 bv[0].bv_len |= (buf[1] << 8);
967                                                 bv[0].bv_val = (char *)buf+2;
968                                                 buf[1] = 0;
969                                         } else {
970                                                 bv[0].bv_val = (char *)buf+1;
971                                         }
972                                         buf[0] = 0;
973                                         if ( bv[0].bv_len == 0 ) {
974                                                 a = a->a_next;
975                                                 continue;
976                                         }
977                                         bv[0].bv_val[bv[0].bv_len] = '\0';
978                                         value_add_one( &a->a_vals, bv );
979                                 }
980                                 a->a_desc = oci->no_attrs[j]->na_desc;
981                                 attr_normalize_one( a->a_desc, bv, &nbv, NULL );
982                                 a->a_numvals++;
983                                 if ( !BER_BVISNULL( &nbv )) {
984                                         ber_bvarray_add_x( &a->a_nvals, &nbv, NULL );
985                                 } else if ( !a->a_nvals ) {
986                                         a->a_nvals = a->a_vals;
987                                 }
988                                 a = a->a_next;
989                         }
990                 }
991                 k = 0;
992                 for ( i=0; i<nocs; i++ ) {
993                         if ( !myop[i] )
994                                 continue;
995                         if ((j = myop[i]->nextResult(true))) {
996                                 if ( j < 0 ) {
997                                         Debug( LDAP_DEBUG_TRACE,
998                                                 "ndb_entry_get_data: last nextResult(%d) failed: %s (%d)\n",
999                                                 i, myop[i]->getNdbError().message, myop[i]->getNdbError().code );
1000                                 }
1001                                 myop[i] = NULL;
1002                         } else {
1003                                 k = 1;
1004                         }
1005                 }
1006         } while ( k );
1007
1008         rc = 0;
1009 leave:
1010         if ( myop ) {
1011                 op->o_tmpfree( myop, op->o_tmpmemctx );
1012         }
1013         if ( attrs ) {
1014                 op->o_tmpfree( attrs, op->o_tmpmemctx );
1015         }
1016         if ( ocx ) {
1017                 op->o_tmpfree( ocx, op->o_tmpmemctx );
1018         }
1019         if ( oclist ) {
1020                 op->o_tmpfree( oclist, op->o_tmpmemctx );
1021         }
1022
1023         return rc;
1024 }
1025
1026 static int
1027 ndb_oc_del( 
1028         NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
1029 {
1030         NdbIndexScanOperation *myop;
1031         int i, rc;
1032
1033         for ( i=0; i<no->no_nsets; i++ ) {
1034                 rc = ndb_oc_del( txn, eid, no->no_sets[i] );
1035                 if ( rc ) return rc;
1036         }
1037
1038         myop = txn->getNdbIndexScanOperation( "PRIMARY", no->no_table.bv_val );
1039         if ( !myop )
1040                 return LDAP_OTHER;
1041         if ( myop->readTuples( NdbOperation::LM_Exclusive ))
1042                 return LDAP_OTHER;
1043         if ( myop->setBound( 0U, NdbIndexScanOperation::BoundEQ, &eid ))
1044                 return LDAP_OTHER;
1045
1046         txn->execute(NoCommit);
1047         while ( myop->nextResult(true) == 0) {
1048                 do {
1049                         myop->deleteCurrentTuple();
1050                 } while (myop->nextResult(false) == 0);
1051                 txn->execute(NoCommit);
1052         }
1053
1054         return 0;
1055 }
1056
1057 extern "C" int
1058 ndb_entry_del_data(
1059         BackendDB *be,
1060         NdbArgs *NA
1061 )
1062 {
1063         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064         Uint64 eid = NA->e->e_id;
1065         int i;
1066         NdbOcs myOcs;
1067
1068         ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
1069         myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
1070
1071         for ( i=0; i<myOcs.no_ninfo; i++ ) {
1072                 if ( ndb_oc_del( NA->txn, eid, myOcs.no_info[i] ))
1073                         return LDAP_OTHER;
1074         }
1075
1076         return 0;
1077 }
1078
1079 extern "C" int
1080 ndb_dn2rdns(
1081         struct berval *dn,
1082         NdbRdns *rdns
1083 )
1084 {
1085         char *beg, *end;
1086         int i, len;
1087
1088         /* Walk thru RDNs */
1089         end = dn->bv_val + dn->bv_len;
1090         for ( i=0; i<NDB_MAX_RDNS; i++ ) {
1091                 for ( beg = end-1; beg > dn->bv_val; beg-- ) {
1092                         if (*beg == ',') {
1093                                 beg++;
1094                                 break;
1095                         }
1096                 }
1097                 if ( beg >= dn->bv_val ) {
1098                         len = end - beg;
1099                         /* RDN is too long */
1100                         if ( len > NDB_RDN_LEN )
1101                                 return LDAP_CONSTRAINT_VIOLATION;
1102                         memcpy( rdns->nr_buf[i]+1, beg, len );
1103                 } else {
1104                         break;
1105                 }
1106                 rdns->nr_buf[i][0] = len;
1107                 end = beg - 1;
1108         }
1109         /* Too many RDNs in DN */
1110         if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
1111                         return LDAP_CONSTRAINT_VIOLATION;
1112         }
1113         rdns->nr_num = i;
1114         return 0;
1115 }
1116
1117 static int
1118 ndb_rdns2keys(
1119         NdbOperation *myop,
1120         NdbRdns *rdns
1121 )
1122 {
1123         int i;
1124         char dummy[2] = {0,0};
1125
1126         /* Walk thru RDNs */
1127         for ( i=0; i<rdns->nr_num; i++ ) {
1128                 if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
1129                         return LDAP_OTHER;
1130         }
1131         for ( ; i<NDB_MAX_RDNS; i++ ) {
1132                 if ( myop->equal( i+RDN_COLUMN, dummy ))
1133                         return LDAP_OTHER;
1134         }
1135         return 0;
1136 }
1137
1138 /* Store the DN2ID_TABLE fields */
1139 extern "C" int
1140 ndb_entry_put_info(
1141         BackendDB *be,
1142         NdbArgs *NA,
1143         int update
1144 )
1145 {
1146         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1147         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1148         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1149         NdbOperation *myop;
1150         NdbAttrInfo *ai;
1151         Attribute *aoc, *a;
1152
1153         /* Get the entry's objectClass attribute; it's ok to be
1154          * absent on a fresh insert
1155          */
1156         aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
1157         if ( update && !aoc )
1158                 return LDAP_OBJECT_CLASS_VIOLATION;
1159
1160         myop = NA->txn->getNdbOperation( myTable );
1161         if ( !myop )
1162                 return LDAP_OTHER;
1163         if ( update ) {
1164                 if ( myop->updateTuple())
1165                         return LDAP_OTHER;
1166         } else {
1167                 if ( myop->insertTuple())
1168                         return LDAP_OTHER;
1169         }
1170
1171         if ( ndb_rdns2keys( myop, NA->rdns ))
1172                 return LDAP_OTHER;
1173
1174         /* Set entry ID */
1175         {
1176                 Uint64 eid = NA->e->e_id;
1177                 if ( myop->setValue( EID_COLUMN, eid ))
1178                         return LDAP_OTHER;
1179         }
1180
1181         /* Set list of objectClasses */
1182         /* List is <sp> <class> <sp> <class> <sp> ... so that
1183          * searches for " class " will yield accurate results
1184          */
1185         if ( aoc ) {
1186                 char *ptr, buf[sizeof(MedVar)];
1187                 NdbOcs myOcs;
1188                 int i;
1189
1190                 ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
1191                 ptr = buf+2;
1192                 *ptr++ = ' ';
1193                 for ( i=0; i<myOcs.no_ntext; i++ ) {
1194                         /* data loss... */
1195                         if ( ptr + myOcs.no_text[i].bv_len + 1 >= &buf[sizeof(buf)] )
1196                                 break;
1197                         ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
1198                         *ptr++ = ' ';
1199                 }
1200
1201                 /* implicit classes */
1202                 if ( myOcs.no_nitext ) {
1203                         *ptr++ = '@';
1204                         *ptr++ = ' ';
1205                         for ( i=0; i<myOcs.no_nitext; i++ ) {
1206                                 /* data loss... */
1207                                 if ( ptr + myOcs.no_itext[i].bv_len + 1 >= &buf[sizeof(buf)] )
1208                                         break;
1209                                 ptr = lutil_strcopy( ptr, myOcs.no_itext[i].bv_val );
1210                                 *ptr++ = ' ';
1211                         }
1212                 }
1213
1214                 i = ptr - buf - 2;
1215                 buf[0] = i & 0xff;
1216                 buf[1] = i >> 8;
1217                 if ( myop->setValue( OCS_COLUMN, buf ))
1218                         return LDAP_OTHER;
1219         }
1220
1221         /* Set any indexed attrs */
1222         for ( a = NA->e->e_attrs; a; a=a->a_next ) {
1223                 ai = ndb_ai_find( ni, a->a_desc->ad_type );
1224                 if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
1225                         char *ptr, buf[sizeof(MedVar)];
1226                         int len;
1227
1228                         ptr = buf+1;
1229                         len = a->a_vals[0].bv_len;
1230                         /* FIXME: data loss */
1231                         if ( len > ai->na_len )
1232                                 len = ai->na_len;
1233                         buf[0] = len & 0xff;
1234                         if ( ai->na_len > 255 ) {
1235                                 *ptr++ = len >> 8;
1236                         }
1237                         memcpy( ptr, a->a_vals[0].bv_val, len );
1238                         if ( myop->setValue( ai->na_ixcol, buf ))
1239                                 return LDAP_OTHER;
1240                 }
1241         }
1242
1243         return 0;
1244 }
1245
1246 extern "C" struct berval *
1247 ndb_str2bvarray(
1248         char *str,
1249         int len,
1250         char delim,
1251         void *ctx
1252 )
1253 {
1254         struct berval *list, tmp;
1255         char *beg;
1256         int i, num;
1257
1258         while ( *str == delim ) {
1259                 str++;
1260                 len--;
1261         }
1262
1263         while ( str[len-1] == delim ) {
1264                 str[--len] = '\0';
1265         }
1266
1267         for ( i = 1, beg = str;; i++ ) {
1268                 beg = strchr( beg, delim );
1269                 if ( !beg )
1270                         break;
1271                 if ( beg >= str + len )
1272                         break;
1273                 beg++;
1274         }
1275
1276         num = i;
1277         list = (struct berval *)slap_sl_malloc( (num+1)*sizeof(struct berval), ctx);
1278
1279         for ( i = 0, beg = str; i<num; i++ ) {
1280                 tmp.bv_val = beg;
1281                 beg = strchr( beg, delim );
1282                 if ( beg >= str + len )
1283                         beg = NULL;
1284                 if ( beg ) {
1285                         tmp.bv_len = beg - tmp.bv_val;
1286                 } else {
1287                         tmp.bv_len = len - (tmp.bv_val - str);
1288                 }
1289                 ber_dupbv_x( &list[i], &tmp, ctx );
1290                 beg++;
1291         }
1292
1293         BER_BVZERO( &list[i] );
1294         return list;
1295 }
1296
1297 extern "C" struct berval *
1298 ndb_ref2oclist(
1299         const char *ref,
1300         void *ctx
1301 )
1302 {
1303         char *implied;
1304
1305         /* MedVar */
1306         int len = ref[0] | (ref[1] << 8);
1307
1308         /* don't return the implied classes */
1309         implied = (char *)memchr( ref+2, '@', len );
1310         if ( implied ) {
1311                 len = implied - ref - 2;
1312                 *implied = '\0';
1313         }
1314
1315         return ndb_str2bvarray( (char *)ref+2, len, ' ', ctx );
1316 }
1317
1318 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1319  * the existence of a DN.
1320  */
1321 extern "C" int
1322 ndb_entry_get_info(
1323         Operation *op,
1324         NdbArgs *NA,
1325         int update,
1326         struct berval *matched
1327 )
1328 {
1329         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1330         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1331         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1332         NdbOperation *myop[NDB_MAX_RDNS];
1333         NdbRecAttr *eid[NDB_MAX_RDNS], *oc[NDB_MAX_RDNS];
1334         char idbuf[NDB_MAX_RDNS][2*sizeof(ID)];
1335         char ocbuf[NDB_MAX_RDNS][NDB_OC_BUFLEN];
1336
1337         if ( matched ) {
1338                 BER_BVZERO( matched );
1339         }
1340         if ( !myTable ) {
1341                 return LDAP_OTHER;
1342         }
1343
1344         myop[0] = NA->txn->getNdbOperation( myTable );
1345         if ( !myop[0] ) {
1346                 return LDAP_OTHER;
1347         }
1348
1349         if ( myop[0]->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1350                 return LDAP_OTHER;
1351         }
1352
1353         if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1354                 return LDAP_NO_SUCH_OBJECT;
1355         }
1356
1357         if ( ndb_rdns2keys( myop[0], NA->rdns )) {
1358                 return LDAP_OTHER;
1359         }
1360
1361         eid[0] = myop[0]->getValue( EID_COLUMN, idbuf[0] );
1362         if ( !eid[0] ) {
1363                 return LDAP_OTHER;
1364         }
1365
1366         ocbuf[0][0] = 0;
1367         ocbuf[0][1] = 0;
1368         if ( !NA->ocs ) {
1369                 oc[0] = myop[0]->getValue( OCS_COLUMN, ocbuf[0] );
1370                 if ( !oc[0] ) {
1371                         return LDAP_OTHER;
1372                 }
1373         }
1374
1375         if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1376                 return LDAP_OTHER;
1377         }
1378
1379         switch( myop[0]->getNdbError().code ) {
1380         case 0:
1381                 if ( !eid[0]->isNULL() && ( NA->e->e_id = eid[0]->u_64_value() )) {
1382                         /* If we didn't care about OCs, or we got them */
1383                         if ( NA->ocs || ocbuf[0][0] || ocbuf[0][1] ) {
1384                                 /* If wanted, return them */
1385                                 if ( !NA->ocs )
1386                                         NA->ocs = ndb_ref2oclist( ocbuf[0], op->o_tmpmemctx );
1387                                 break;
1388                         }
1389                 }
1390                 /* FALLTHRU */
1391         case NDB_NO_SUCH_OBJECT:        /* no such tuple: look for closest parent */
1392                 if ( matched ) {
1393                         int i, j, k;
1394                         char dummy[2] = {0,0};
1395
1396                         /* get to last RDN, then back up 1 */
1397                         k = NA->rdns->nr_num - 1;
1398
1399                         for ( i=0; i<k; i++ ) {
1400                                 myop[i] = NA->txn->getNdbOperation( myTable );
1401                                 if ( !myop[i] )
1402                                         return LDAP_OTHER;
1403                                 if ( myop[i]->readTuple( NdbOperation::LM_CommittedRead ))
1404                                         return LDAP_OTHER;
1405                                 for ( j=0; j<=i; j++ ) {
1406                                         if ( myop[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1407                                                 return LDAP_OTHER;
1408                                 }
1409                                 for ( ;j<NDB_MAX_RDNS; j++ ) {
1410                                         if ( myop[i]->equal( j+RDN_COLUMN, dummy ))
1411                                                 return LDAP_OTHER;
1412                                 }
1413                                 eid[i] = myop[i]->getValue( EID_COLUMN, idbuf[i] );
1414                                 if ( !eid[i] ) {
1415                                         return LDAP_OTHER;
1416                                 }
1417                                 ocbuf[i][0] = 0;
1418                                 ocbuf[i][1] = 0;
1419                                 if ( !NA->ocs ) {
1420                                         oc[i] = myop[0]->getValue( OCS_COLUMN, ocbuf[i] );
1421                                         if ( !oc[i] ) {
1422                                                 return LDAP_OTHER;
1423                                         }
1424                                 }
1425                         }
1426                         if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1427                                 return LDAP_OTHER;
1428                         }
1429                         for ( --i; i>=0; i-- ) {
1430                                 if ( myop[i]->getNdbError().code == 0 ) {
1431                                         for ( j=0; j<=i; j++ )
1432                                                 matched->bv_len += NA->rdns->nr_buf[j][0];
1433                                         NA->erdns = NA->rdns->nr_num;
1434                                         NA->rdns->nr_num = j;
1435                                         matched->bv_len += i;
1436                                         matched->bv_val = NA->e->e_name.bv_val +
1437                                                 NA->e->e_name.bv_len - matched->bv_len;
1438                                         if ( !eid[i]->isNULL() )
1439                                                 NA->e->e_id = eid[i]->u_64_value();
1440                                         if ( !NA->ocs )
1441                                                 NA->ocs = ndb_ref2oclist( ocbuf[i], op->o_tmpmemctx );
1442                                         break;
1443                                 }
1444                         }
1445                 }
1446                 return LDAP_NO_SUCH_OBJECT;
1447         default:
1448                 return LDAP_OTHER;
1449         }
1450
1451         return 0;
1452 }
1453
1454 extern "C" int
1455 ndb_entry_del_info(
1456         BackendDB *be,
1457         NdbArgs *NA
1458 )
1459 {
1460         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1461         const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1462         const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1463         NdbOperation *myop;
1464
1465         myop = NA->txn->getNdbOperation( myTable );
1466         if ( !myop )
1467                 return LDAP_OTHER;
1468         if ( myop->deleteTuple())
1469                 return LDAP_OTHER;
1470
1471         if ( ndb_rdns2keys( myop, NA->rdns ))
1472                 return LDAP_OTHER;
1473
1474         return 0;
1475 }
1476
1477 extern "C" int
1478 ndb_next_id(
1479         BackendDB *be,
1480         Ndb *ndb,
1481         ID *id
1482 )
1483 {
1484         struct ndb_info *ni = (struct ndb_info *) be->be_private;
1485         const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1486         const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1487         Uint64 nid = 0;
1488         int rc;
1489
1490         if ( !myTable ) {
1491                 Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1492                         0, 0, 0 );
1493                 return LDAP_OTHER;
1494         }
1495
1496         rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1497         if ( !rc )
1498                 *id = nid;
1499         return rc;
1500 }
1501
1502 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1503 static void
1504 ndb_thread_hfree( void *key, void *data )
1505 {
1506         Ndb *ndb = (Ndb *)data;
1507         delete ndb;
1508 }
1509
1510 extern "C" int
1511 ndb_thread_handle(
1512         Operation *op,
1513         Ndb **ndb )
1514 {
1515         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1516         void *data;
1517
1518         if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1519                 Ndb *myNdb;
1520                 int rc;
1521                 ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1522                 myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1523                 if ( ni->ni_nextconn >= ni->ni_nconns )
1524                         ni->ni_nextconn = 0;
1525                 ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1526                 if ( !myNdb ) {
1527                         return LDAP_OTHER;
1528                 }
1529                 rc = myNdb->init(1024);
1530                 if ( rc ) {
1531                         delete myNdb;
1532                         Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1533                                 rc, 0, 0 );
1534                         return rc;
1535                 }
1536                 data = (void *)myNdb;
1537                 if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1538                         data, ndb_thread_hfree, NULL, NULL ))) {
1539                         delete myNdb;
1540                         Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1541                                 rc, 0, 0 );
1542                         return rc;
1543                 }
1544         }
1545         *ndb = (Ndb *)data;
1546         return 0;
1547 }
1548
1549 extern "C" int
1550 ndb_entry_get(
1551         Operation *op,
1552         struct berval *ndn,
1553         ObjectClass *oc,
1554         AttributeDescription *ad,
1555         int rw,
1556         Entry **ent )
1557 {
1558         struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1559         NdbArgs NA;
1560         Entry e = {0};
1561         int rc;
1562
1563         /* Get our NDB handle */
1564         rc = ndb_thread_handle( op, &NA.ndb );
1565
1566         NA.txn = NA.ndb->startTransaction();
1567         if( !NA.txn ) {
1568                 Debug( LDAP_DEBUG_TRACE,
1569                         LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1570                         NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1571                 return 1;
1572         }
1573
1574         e.e_name = *ndn;
1575         NA.e = &e;
1576         /* get entry */
1577         {
1578                 NdbRdns rdns;
1579                 rdns.nr_num = 0;
1580                 NA.ocs = NULL;
1581                 NA.rdns = &rdns;
1582                 rc = ndb_entry_get_info( op, &NA, rw, NULL );
1583         }
1584         if ( rc == 0 ) {
1585                 e.e_name = *ndn;
1586                 e.e_nname = *ndn;
1587                 rc = ndb_entry_get_data( op, &NA, 0 );
1588                 ber_bvarray_free( NA.ocs );
1589                 if ( rc == 0 ) {
1590                         if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1591                                 attrs_free( e.e_attrs );
1592                                 rc = 1;
1593                         }
1594                 }
1595         }
1596         if ( rc == 0 ) {
1597                 *ent = entry_alloc();
1598                 **ent = e;
1599                 ber_dupbv( &(*ent)->e_name, ndn );
1600                 ber_dupbv( &(*ent)->e_nname, ndn );
1601         } else {
1602                 rc = 1;
1603         }
1604         NA.txn->close();
1605         return rc;
1606 }
1607
1608 /* Congestion avoidance code
1609  * for Deadlock Rollback
1610  */
1611
1612 extern "C" void
1613 ndb_trans_backoff( int num_retries )
1614 {
1615         int i;
1616         int delay = 0;
1617         int pow_retries = 1;
1618         unsigned long key = 0;
1619         unsigned long max_key = -1;
1620         struct timeval timeout;
1621
1622         lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1623
1624         for ( i = 0; i < num_retries; i++ ) {
1625                 if ( i >= 5 ) break;
1626                 pow_retries *= 4;
1627         }
1628
1629         delay = 16384 * (key * (double) pow_retries / (double) max_key);
1630         delay = delay ? delay : 1;
1631
1632         Debug( LDAP_DEBUG_TRACE,  "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1633
1634         timeout.tv_sec = delay / 1000000;
1635         timeout.tv_usec = delay % 1000000;
1636         select( 0, NULL, NULL, NULL, &timeout );
1637 }
1638
1639 extern "C" void
1640 ndb_check_referral( Operation *op, SlapReply *rs, NdbArgs *NA )
1641 {
1642         struct berval dn, ndn;
1643         int i, dif;
1644         dif = NA->erdns - NA->rdns->nr_num;
1645
1646         /* Set full DN of matched into entry */
1647         for ( i=0; i<dif; i++ ) {
1648                 dnParent( &NA->e->e_name, &dn );
1649                 dnParent( &NA->e->e_nname, &ndn );
1650                 NA->e->e_name = dn;
1651                 NA->e->e_nname = ndn;
1652         }
1653
1654         /* return referral only if "disclose" is granted on the object */
1655         if ( access_allowed( op, NA->e, slap_schema.si_ad_entry,
1656                 NULL, ACL_DISCLOSE, NULL )) {
1657                 Attribute a;
1658                 for ( i=0; !BER_BVISNULL( &NA->ocs[i] ); i++ );
1659                 a.a_numvals = i;
1660                 a.a_desc = slap_schema.si_ad_objectClass;
1661                 a.a_vals = NA->ocs;
1662                 a.a_nvals = NA->ocs;
1663                 a.a_next = NULL;
1664                 NA->e->e_attrs = &a;
1665                 if ( is_entry_referral( NA->e )) {
1666                         NA->e->e_attrs = NULL;
1667                         ndb_entry_get_data( op, NA, 0 );
1668                         rs->sr_ref = get_entry_referrals( op, NA->e );
1669                         if ( rs->sr_ref ) {
1670                                 rs->sr_err = LDAP_REFERRAL;
1671                                 rs->sr_flags |= REP_REF_MUSTBEFREED;
1672                         }
1673                         attrs_free( NA->e->e_attrs );
1674                 }
1675                 NA->e->e_attrs = NULL;
1676         }
1677 }