1 /* ndbio.cpp - get/set/del data for NDB */
3 /* This work is part of OpenLDAP Software <http://www.openldap.org/>.
5 * Copyright 2008 The OpenLDAP Foundation.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted only as authorized by the OpenLDAP
12 * A copy of this license is available in the file LICENSE in the
13 * top-level directory of the distribution or, alternatively, at
14 * <http://www.OpenLDAP.org/license.html>.
17 * This work was initially developed by Howard Chu for inclusion
18 * in OpenLDAP Software. This work was sponsored by MySQL.
24 #include <ac/string.h>
30 /* For reference only */
31 typedef struct MedVar {
32 Int16 len; /* length is always little-endian */
37 static int ndb_name_cmp( const void *v1, const void *v2 );
38 static int ndb_oc_dup_err( void *v1, void *v2 );
42 ndb_name_cmp( const void *v1, const void *v2 )
44 NdbOcInfo *oc1 = (NdbOcInfo *)v1, *oc2 = (NdbOcInfo *)v2;
45 return ber_bvstrcasecmp( &oc1->no_name, &oc2->no_name );
49 ndb_oc_dup_err( void *v1, void *v2 )
51 NdbOcInfo *oc = (NdbOcInfo *)v2;
53 oc->no_oc = (ObjectClass *)v1;
57 /* Find an existing NdbAttrInfo */
58 extern "C" NdbAttrInfo *
59 ndb_ai_find( struct ndb_info *ni, AttributeType *at )
62 atmp.na_name = at->sat_cname;
64 return (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
67 /* Find or create an NdbAttrInfo */
68 extern "C" NdbAttrInfo *
69 ndb_ai_get( struct ndb_info *ni, struct berval *aname )
71 NdbAttrInfo atmp, *ai;
72 atmp.na_name = *aname;
74 ai = (NdbAttrInfo *)avl_find( ni->ni_ai_tree, &atmp, ndb_name_cmp );
77 AttributeDescription *ad = NULL;
79 if ( slap_bv2ad( aname, &ad, &text ))
82 ai = (NdbAttrInfo *)ch_malloc( sizeof( NdbAttrInfo ));
84 ai->na_attr = ai->na_desc->ad_type;
85 ai->na_name = ai->na_attr->sat_cname;
89 ai->na_len = ai->na_attr->sat_atype.at_syntax_len;
90 /* Reasonable default */
92 if ( ai->na_attr->sat_syntax == slap_schema.si_syn_distinguishedName )
98 if ( ai->na_len > 1024 )
100 avl_insert( &ni->ni_ai_tree, ai, ndb_name_cmp, avl_dup_error );
106 ndb_ai_check( struct ndb_info *ni, NdbOcInfo *oci, AttributeType **attrs, char **ptr, int *col,
109 NdbAttrInfo *ai, atmp;
112 for ( i=0; attrs[i]; i++ ) {
113 if ( attrs[i] == slap_schema.si_ad_objectClass->ad_type )
115 /* skip attrs that are in a superior */
116 if ( oci->no_oc && oci->no_oc->soc_sups ) {
119 for ( j=0; oci->no_oc->soc_sups[j]; j++ ) {
120 oc = oci->no_oc->soc_sups[j];
121 if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
123 if ( oc->soc_required ) {
124 for ( k=0; oc->soc_required[k]; k++ ) {
125 if ( attrs[i] == oc->soc_required[k] ) {
132 if ( oc->soc_allowed ) {
133 for ( k=0; oc->soc_allowed[k]; k++ ) {
134 if ( attrs[i] == oc->soc_allowed[k] ) {
146 ai = ndb_ai_get( ni, &attrs[i]->sat_cname );
148 /* can never happen */
152 /* An indexed attr is defined before its OC is */
155 ai->na_column = (*col)++;
158 oci->no_attrs[oci->no_nattrs++] = ai;
160 /* An attrset attr may already be defined */
161 if ( ai->na_oi != oci ) {
163 for ( j=0; j<oci->no_nsets; j++ )
164 if ( oci->no_sets[j] == ai->na_oi ) break;
165 if ( j >= oci->no_nsets ) {
166 /* FIXME: data loss if more sets are in use */
167 if ( oci->no_nsets < NDB_MAX_OCSETS ) {
168 oci->no_sets[oci->no_nsets++] = ai->na_oi;
175 *ptr += sprintf( *ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
183 ndb_oc_create( struct ndb_info *ni, NdbOcInfo *oci, int create )
185 char buf[4096], *ptr;
189 ptr = buf + sprintf( buf,
190 "CREATE TABLE `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
191 oci->no_table.bv_val );
195 if ( oci->no_oc->soc_required ) {
196 for ( i=0; oci->no_oc->soc_required[i]; i++ );
199 if ( oci->no_oc->soc_allowed ) {
200 for ( i=0; oci->no_oc->soc_allowed[i]; i++ );
203 /* assume all are present */
204 oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
207 ldap_pvt_thread_rdwr_wlock( &ni->ni_ai_rwlock );
208 if ( oci->no_oc->soc_required ) {
209 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, &ptr, &col, create );
211 if ( !rc && oci->no_oc->soc_allowed ) {
212 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, &ptr, &col, create );
214 ldap_pvt_thread_rdwr_wunlock( &ni->ni_ai_rwlock );
216 /* shrink down to just the needed size */
217 oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
218 oci->no_nattrs * sizeof(struct ndb_attrinfo *));
221 ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
222 rc = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
224 Debug( LDAP_DEBUG_ANY,
225 "ndb_oc_create: CREATE TABLE %s failed, %s (%d)\n",
226 oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
232 /* Read table definitions from the DB and populate ObjectClassInfo */
234 ndb_oc_read( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict )
236 const NdbDictionary::Table *myTable;
237 const NdbDictionary::Column *myCol;
238 NdbOcInfo *oci, octmp;
241 NdbDictionary::Dictionary::List myList;
245 rc = myDict->listObjects( myList, NdbDictionary::Object::UserTable );
246 /* Populate our objectClass structures */
247 for ( i=0; i<myList.count; i++ ) {
248 /* Ignore other DBs */
249 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
251 /* Ignore internal tables */
252 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
254 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
255 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
259 oc = oc_bvfind( &octmp.no_name );
261 /* undefined - shouldn't happen */
264 myTable = myDict->getTable( myList.elements[i].name );
265 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
266 oci->no_table.bv_val = (char *)(oci+1);
267 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
268 oci->no_table.bv_len = oc->soc_cname.bv_len;
269 oci->no_name = oci->no_table;
275 /* Make space for all attrs, even tho sups will be dropped */
276 if ( oci->no_oc->soc_required ) {
277 for ( j=0; oci->no_oc->soc_required[j]; j++ );
280 if ( oci->no_oc->soc_allowed ) {
281 for ( j=0; oci->no_oc->soc_allowed[j]; j++ );
284 oci->no_attrs = (struct ndb_attrinfo **)ch_malloc( col * sizeof(struct ndb_attrinfo *));
285 avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
287 col = myTable->getNoOfColumns();
289 for ( j = 1; j<col; j++ ) {
290 myCol = myTable->getColumn( j );
291 ber_str2bv( myCol->getName(), 0, 0, &bv );
292 ai = ndb_ai_get( ni, &bv );
293 /* shouldn't happen */
298 ai->na_len = myCol->getLength();
301 /* Link to any attrsets */
302 for ( i=0; i<myList.count; i++ ) {
303 /* Ignore other DBs */
304 if ( strcmp( myList.elements[i].database, ni->ni_dbname ))
306 /* Ignore internal tables */
307 if ( !strncmp( myList.elements[i].name, "OL_", 3 ))
309 ber_str2bv( myList.elements[i].name, 0, 0, &octmp.no_name );
310 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
311 /* shouldn't happen */
315 if ( oci->no_oc->soc_required ) {
316 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_required, NULL, &col, 0 );
318 if ( oci->no_oc->soc_allowed ) {
319 rc = ndb_ai_check( ni, oci, oci->no_oc->soc_allowed, NULL, &col, 0 );
321 /* shrink down to just the needed size */
322 oci->no_attrs = (struct ndb_attrinfo **)ch_realloc( oci->no_attrs,
323 oci->no_nattrs * sizeof(struct ndb_attrinfo *));
329 ndb_oc_get( struct ndb_info *ni, const NdbDictionary::Dictionary *myDict,
330 struct berval *oname, NdbOcs *out )
332 const NdbDictionary::Table *myTable;
333 NdbOcInfo *oci, octmp;
338 if ( ber_bvstrcasecmp( oname, &slap_schema.si_oc_top->soc_cname )) {
339 octmp.no_name = *oname;
340 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
344 oc = oc_bvfind( oname );
346 /* undefined - shouldn't happen */
347 return LDAP_INVALID_SYNTAX;
350 if ( oc->soc_sups ) {
353 for ( i=0; oc->soc_sups[i]; i++ ) {
354 rc = ndb_oc_get( ni, myDict, &oc->soc_sups[i]->soc_cname, out );
359 oc = slap_schema.si_oc_top;
361 /* Only insert once */
362 for ( i=0; i<out->no_ntext; i++ )
363 if ( out->no_text[i].bv_val == oc->soc_cname.bv_val )
365 if ( i == out->no_ntext )
366 out->no_text[out->no_ntext++] = oc->soc_cname;
368 /* ignore top, etc... */
369 if ( oc->soc_kind == LDAP_SCHEMA_ABSTRACT )
373 ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
374 oci = (NdbOcInfo *)ch_malloc( sizeof( NdbOcInfo )+oc->soc_cname.bv_len+1 );
375 oci->no_table.bv_val = (char *)(oci+1);
376 strcpy( oci->no_table.bv_val, oc->soc_cname.bv_val );
377 oci->no_table.bv_len = oc->soc_cname.bv_len;
378 oci->no_name = oci->no_table;
383 ldap_pvt_thread_rdwr_wlock( &ni->ni_oc_rwlock );
384 if ( avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, ndb_oc_dup_err )) {
385 octmp.no_oc = oci->no_oc;
387 oci = (NdbOcInfo *)octmp.no_oc;
389 /* see if the oc table already exists in the DB */
390 myTable = myDict->getTable( oci->no_table.bv_val );
391 rc = ndb_oc_create( ni, oci, myTable == NULL );
392 ldap_pvt_thread_rdwr_wunlock( &ni->ni_oc_rwlock );
393 ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
396 /* Only insert once */
397 for ( i=0; i<out->no_ninfo; i++ )
398 if ( out->no_info[i] == oci )
400 if ( i == out->no_ninfo )
401 out->no_info[out->no_ninfo++] = oci;
406 ndb_aset_get( struct ndb_info *ni, struct berval *sname, struct berval *attrs, NdbOcInfo **ret )
408 NdbOcInfo *oci, octmp;
411 octmp.no_name = *sname;
412 oci = (NdbOcInfo *)avl_find( ni->ni_oc_tree, &octmp, ndb_name_cmp );
414 return LDAP_ALREADY_EXISTS;
416 for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ ) {
417 if ( !at_bvfind( &attrs[i] ))
418 return LDAP_NO_SUCH_ATTRIBUTE;
422 oci = (NdbOcInfo *)ch_calloc( 1, sizeof( NdbOcInfo ) + sizeof( ObjectClass ) +
423 i*sizeof(AttributeType *) + sname->bv_len+1 );
424 oci->no_oc = (ObjectClass *)(oci+1);
425 oci->no_oc->soc_required = (AttributeType **)(oci->no_oc+1);
426 oci->no_table.bv_val = (char *)(oci->no_oc->soc_required+i);
428 for ( i=0; !BER_BVISNULL( &attrs[i] ); i++ )
429 oci->no_oc->soc_required[i] = at_bvfind( &attrs[i] );
431 strcpy( oci->no_table.bv_val, sname->bv_val );
432 oci->no_table.bv_len = sname->bv_len;
433 oci->no_name = oci->no_table;
434 oci->no_oc->soc_cname = oci->no_name;
435 oci->no_flag = NDB_INFO_ATSET;
437 rc = ndb_oc_create( ni, oci, 0 );
439 rc = avl_insert( &ni->ni_oc_tree, oci, ndb_name_cmp, avl_dup_error );
449 ndb_aset_create( struct ndb_info *ni, NdbOcInfo *oci )
451 char buf[4096], *ptr;
455 ptr = buf + sprintf( buf,
456 "CREATE TABLE IF NOT EXISTS `%s` (eid bigint unsigned NOT NULL PRIMARY KEY",
457 oci->no_table.bv_val );
459 for ( i=0; i<oci->no_nattrs; i++ ) {
460 if ( oci->no_attrs[i]->na_oi != oci )
462 ai = oci->no_attrs[i];
463 ptr += sprintf( ptr, ", `%s` VARCHAR(%d)", ai->na_attr->sat_cname.bv_val,
465 if ( ai->na_flag & NDB_INFO_INDEX ) {
466 ptr += sprintf( ptr, ", INDEX (`%s`)", ai->na_attr->sat_cname.bv_val );
469 ptr = lutil_strcopy( ptr, " ) ENGINE=ndb" );
470 i = mysql_real_query( &ni->ni_sql, buf, ptr - buf );
472 Debug( LDAP_DEBUG_ANY,
473 "ndb_aset_create: CREATE TABLE %s failed, %s (%d)\n",
474 oci->no_table.bv_val, mysql_error(&ni->ni_sql), mysql_errno(&ni->ni_sql) );
480 ndb_oc_check( BackendDB *be, Ndb *ndb,
481 struct berval *ocsin, NdbOcs *out )
483 struct ndb_info *ni = (struct ndb_info *) be->be_private;
484 const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
491 /* Find all objectclasses and their superiors. List
492 * the superiors first.
495 ldap_pvt_thread_rdwr_rlock( &ni->ni_oc_rwlock );
496 for ( i=0; !BER_BVISNULL( &ocsin[i] ); i++ ) {
497 rc = ndb_oc_get( ni, myDict, &ocsin[i], out );
500 ldap_pvt_thread_rdwr_runlock( &ni->ni_oc_rwlock );
504 /* set all the unique attrs of this objectclass into the table
505 * max row size is 8192 bytes; how do we detect if the row is too large?
507 * FIXME: Currently only stores the first value of any multivalued attribute.
512 const NdbDictionary::Table *myTable,
522 char buf[65538], *ptr;
524 NdbOperation *myop = retop ? *retop : NULL;
527 for ( i=0; i<nattrs; i++ ) {
528 /* Skip if not in this table */
529 if ( attrs[i]->na_oi != no )
531 for ( a=e->e_attrs; a; a=a->a_next ) {
532 if ( a->a_desc->ad_type == attrs[i]->na_attr )
535 /* If we found a match, set its value. If we found no match
536 * and we're updating, delete its value.
539 /* objectclass is in dn_idx_table */
540 if ( a && a->a_desc == slap_schema.si_ad_objectClass )
543 /* First attr, get the op, set the type and primary key */
545 Uint64 eid = e->e_id;
546 myop = txn->getNdbOperation( myTable );
550 if ( myop->writeTuple())
553 if ( myop->insertTuple())
556 if ( myop->equal( EID_COLUMN, eid ))
561 if ( a->a_vals[0].bv_len > attrs[i]->na_len ) {
562 Debug( LDAP_DEBUG_ANY, "ndb_oc_attrs: attribute %s too long for column\n",
563 attrs[i]->na_name.bv_val, 0, 0 );
564 return LDAP_CONSTRAINT_VIOLATION;
566 *ptr++ = a->a_vals[0].bv_len & 0xff;
567 if ( attrs[i]->na_len > 255 ) {
569 *ptr++ = a->a_vals[0].bv_len >> 8;
571 memcpy( ptr, a->a_vals[0].bv_val, a->a_vals[0].bv_len );
576 if ( myop->setValue( attrs[i]->na_column, ptr ))
588 const NdbDictionary::Dictionary *myDict,
589 NdbTransaction *txn, NdbOcInfo *no, Entry *e, int update )
591 const NdbDictionary::Table *myTable;
594 for ( i=0; i<no->no_nsets; i++ ) {
595 rc = ndb_oc_put( myDict, txn, no->no_sets[i], e, update );
600 myTable = myDict->getTable( no->no_table.bv_val );
605 return ndb_oc_attrs( txn, myTable, e, no, no->no_attrs, no->no_nattrs, update, &i, NULL );
615 struct ndb_info *ni = (struct ndb_info *) be->be_private;
618 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
619 const NdbDictionary::Table *myTable;
624 /* Get the entry's objectClass attribute */
625 aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
629 ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
630 myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
632 /* Walk thru objectclasses, find all the attributes belonging to a class */
633 for ( i=0; i<myOcs.no_ninfo; i++ ) {
634 rc = ndb_oc_put( myDict, NA->txn, myOcs.no_info[i], NA->e, update );
642 ndb_oc_get( NdbOcInfo *no, int *j, int *nocs, NdbOcInfo ***oclist )
647 for ( i=0; i<no->no_nsets; i++ ) {
648 ndb_oc_get( no->no_sets[i], j, nocs, oclist );
652 ol2 = (NdbOcInfo **)ch_realloc( *oclist, *nocs * sizeof(NdbOcInfo *));
659 /* Retrieve attribute data for given entry. The entry's DN and eid should
660 * already be populated.
669 struct ndb_info *ni = (struct ndb_info *) be->be_private;
670 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
671 const NdbDictionary::Table *myTable;
677 NdbOcInfo *oci, **oclist = NULL;
678 char abuf[65536], *ptr, **attrs = NULL;
680 /* FIXME: abuf should be dynamically allocated */
682 int i, j, k, nocs, nattrs, rc = LDAP_OTHER, alen;
684 attr_merge( NA->e, slap_schema.si_ad_objectClass, NA->ocs, NULL );
688 ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
689 myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
690 nocs = myOcs.no_ninfo;
692 oclist = (NdbOcInfo **)ch_calloc( 1, nocs * sizeof(NdbOcInfo *));
694 for ( i=0, j=0; i<myOcs.no_ninfo; i++ ) {
695 ndb_oc_get( myOcs.no_info[i], &j, &nocs, &oclist );
700 for ( i=0; i<nocs; i++ )
701 nattrs += oclist[i]->no_nattrs;
703 attrs = (char **)ch_malloc( nattrs * sizeof(char *));
707 for ( i=0; i<nocs; i++ ) {
709 myTable = myDict->getTable( oci->no_table.bv_val );
711 myop = NA->txn->getNdbOperation( myTable );
714 if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead ))
716 if ( myop->equal( EID_COLUMN, eid ))
719 for ( j=0; j<oci->no_nattrs; j++ ) {
720 if ( oci->no_attrs[j]->na_oi != oci )
724 if ( oci->no_attrs[j]->na_len > 255 )
726 ptr += oci->no_attrs[j]->na_len + 1;
727 myop->getValue( oci->no_attrs[j]->na_column, attrs[k++] );
730 if ( NA->txn->execute( update ? NdbTransaction::NoCommit : NdbTransaction::Commit,
731 update ? NdbOperation::AO_IgnoreError : NdbOperation::AbortOnError, 1) < 0 )
737 for ( i=0; i<nocs; i++ ) {
739 for ( j=0; j<oci->no_nattrs; j++ ) {
742 if ( oci->no_attrs[j]->na_oi != oci )
744 buf = (unsigned char *)attrs[k++];
746 if ( oci->no_attrs[j]->na_len > 255 ) {
748 len |= (buf[1] << 8);
756 a = attrs_alloc( nattrs );
757 NA->e->e_attrs->a_next = a;
759 for ( i=0; i<nocs; i++ ) {
761 for ( j=0; j<oci->no_nattrs; j++ ) {
763 struct berval bv, nbv;
764 if ( oci->no_attrs[j]->na_oi != oci )
766 buf = (unsigned char *)attrs[k++];
768 if ( oci->no_attrs[j]->na_len > 255 ) {
770 bv.bv_len |= (buf[1] << 8);
771 bv.bv_val = (char *)buf+2;
773 bv.bv_val = (char *)buf+1;
775 if ( bv.bv_len == 0 )
777 bv.bv_val[bv.bv_len] = '\0';
778 a->a_desc = oci->no_attrs[j]->na_desc;
779 attr_normalize_one( a->a_desc, &bv, &nbv, NULL );
783 value_add_one( &a->a_vals, &bv );
784 if ( !BER_BVISNULL( &nbv )) {
785 ber_bvarray_add( &a->a_nvals, &nbv );
787 a->a_nvals = a->a_vals;
807 const NdbDictionary::Dictionary *myDict,
808 NdbTransaction *txn, Uint64 eid, NdbOcInfo *no )
810 const NdbDictionary::Table *myTable;
814 for ( i=0; i<no->no_nsets; i++ ) {
815 rc = ndb_oc_del( myDict, txn, eid, no->no_sets[i] );
818 myTable = myDict->getTable( no->no_table.bv_val );
820 myop = txn->getNdbOperation( myTable );
823 if ( myop->deleteTuple() )
825 if ( myop->equal( EID_COLUMN, eid ))
837 struct ndb_info *ni = (struct ndb_info *) be->be_private;
838 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
839 const NdbDictionary::Table *myTable;
841 Uint64 eid = NA->e->e_id;
845 ndb_oc_check( be, NA->ndb, NA->ocs, &myOcs );
846 myOcs.no_info[myOcs.no_ninfo++] = ni->ni_opattrs;
848 for ( i=0; i<myOcs.no_ninfo; i++ ) {
849 if ( ndb_oc_del( myDict, NA->txn, eid, myOcs.no_info[i] ))
866 end = dn->bv_val + dn->bv_len;
867 for ( i=0; i<NDB_MAX_RDNS; i++ ) {
868 for ( beg = end-1; beg > dn->bv_val; beg-- ) {
874 if ( beg >= dn->bv_val ) {
876 /* RDN is too long */
877 if ( len > NDB_RDN_LEN )
878 return LDAP_CONSTRAINT_VIOLATION;
879 memcpy( rdns->nr_buf[i]+1, beg, len );
883 rdns->nr_buf[i][0] = len;
886 /* Too many RDNs in DN */
887 if ( i == NDB_MAX_RDNS && beg > dn->bv_val ) {
888 return LDAP_CONSTRAINT_VIOLATION;
901 char dummy[2] = {0,0};
904 for ( i=0; i<rdns->nr_num; i++ ) {
905 if ( myop->equal( i+RDN_COLUMN, rdns->nr_buf[i] ))
908 for ( ; i<NDB_MAX_RDNS; i++ ) {
909 if ( myop->equal( i+RDN_COLUMN, dummy ))
915 /* Store the DN2ID_TABLE fields */
923 struct ndb_info *ni = (struct ndb_info *) be->be_private;
924 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
925 const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
930 /* Get the entry's objectClass attribute; it's ok to be
931 * absent on a fresh insert
933 aoc = attr_find( NA->e->e_attrs, slap_schema.si_ad_objectClass );
934 if ( update && !aoc )
935 return LDAP_OBJECT_CLASS_VIOLATION;
937 myop = NA->txn->getNdbOperation( myTable );
941 if ( myop->updateTuple())
944 if ( myop->insertTuple())
948 if ( ndb_rdns2keys( myop, NA->rdns ))
953 Uint64 eid = NA->e->e_id;
954 if ( myop->setValue( EID_COLUMN, eid ))
958 /* Set list of objectClasses */
960 char *ptr, buf[sizeof(MedVar)];
964 ndb_oc_check( be, NA->ndb, aoc->a_nvals, &myOcs );
966 for ( i=0; i<myOcs.no_ntext; i++ ) {
968 if ( ptr + myOcs.no_text[i].bv_len >= &buf[sizeof(buf)] )
970 if ( i ) *ptr++ = ' ';
971 ptr = lutil_strcopy( ptr, myOcs.no_text[i].bv_val );
976 if ( myop->setValue( OCS_COLUMN, buf ))
980 /* Set any indexed attrs */
981 for ( a = NA->e->e_attrs; a; a=a->a_next ) {
982 ai = ndb_ai_find( ni, a->a_desc->ad_type );
983 if ( ai && ( ai->na_flag & NDB_INFO_INDEX )) {
984 char *ptr, buf[sizeof(MedVar)];
988 len = a->a_vals[0].bv_len;
989 /* FIXME: data loss */
990 if ( len > ai->na_len )
993 if ( ai->na_len > 255 ) {
996 memcpy( ptr, a->a_vals[0].bv_val, len );
997 if ( myop->setValue( ai->na_ixcol, buf ))
1005 extern "C" struct berval *
1012 struct berval *list, tmp;
1016 for ( i = 1, beg = str;; i++ ) {
1017 beg = strchr( beg, delim );
1024 list = (struct berval *)ch_malloc( (num+1)*sizeof(struct berval));
1026 for ( i = 0, beg = str; i<num; i++ ) {
1028 beg = strchr( beg, delim );
1030 tmp.bv_len = beg - tmp.bv_val;
1032 tmp.bv_len = len - (tmp.bv_val - str);
1034 ber_dupbv( &list[i], &tmp );
1038 BER_BVZERO( &list[i] );
1042 extern "C" struct berval *
1048 int len = ref[0] | (ref[1] << 8);
1049 return ndb_str2bvarray( (char *)ref+2, len, ' ' );
1052 /* Retrieve the DN2ID_TABLE fields. Can call with NULL ocs if just verifying
1053 * the existence of a DN.
1060 struct berval *matched
1063 struct ndb_info *ni = (struct ndb_info *) be->be_private;
1064 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1065 const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1067 NdbRecAttr *attr1, *attr2;
1068 char idbuf[2*sizeof(ID)];
1069 char ocbuf[NDB_OC_BUFLEN];
1072 BER_BVZERO( matched );
1078 myop = NA->txn->getNdbOperation( myTable );
1083 if ( myop->readTuple( update ? NdbOperation::LM_Exclusive : NdbOperation::LM_CommittedRead )) {
1087 if ( !NA->rdns->nr_num && ndb_dn2rdns( &NA->e->e_name, NA->rdns )) {
1088 return LDAP_NO_SUCH_OBJECT;
1091 if ( ndb_rdns2keys( myop, NA->rdns )) {
1095 attr1 = myop->getValue( EID_COLUMN, idbuf );
1103 attr2 = myop->getValue( OCS_COLUMN, ocbuf );
1109 if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1113 switch( myop->getNdbError().code ) {
1115 if ( !attr1->isNULL() && ( NA->e->e_id = attr1->u_64_value() )) {
1116 /* If we didn't care about OCs, or we got them */
1117 if ( NA->ocs || ocbuf[0] || ocbuf[1] ) {
1118 /* If wanted, return them */
1120 NA->ocs = ndb_ref2oclist( ocbuf );
1125 case NDB_NO_SUCH_OBJECT: /* no such tuple: look for closest parent */
1127 NdbOperation *ops[NDB_MAX_RDNS];
1129 char dummy[2] = {0,0};
1131 /* get to last RDN, then back up 1 */
1132 k = NA->rdns->nr_num - 1;
1134 for ( i=0; i<k; i++ ) {
1135 ops[i] = NA->txn->getNdbOperation( myTable );
1138 if ( ops[i]->readTuple( NdbOperation::LM_CommittedRead ))
1140 for ( j=0; j<=i; j++ ) {
1141 if ( ops[i]->equal( j+RDN_COLUMN, NA->rdns->nr_buf[j] ))
1144 for ( ;j<NDB_MAX_RDNS; j++ ) {
1145 if ( ops[i]->equal( j+RDN_COLUMN, dummy ))
1149 if ( NA->txn->execute(NdbTransaction::NoCommit, NdbOperation::AO_IgnoreError, 1) < 0 ) {
1152 for ( --i; i>=0; i-- ) {
1153 if ( ops[i]->getNdbError().code == 0 ) {
1154 for ( j=0; j<=i; j++ )
1155 matched->bv_len += NA->rdns->nr_buf[j][0];
1156 matched->bv_len += i;
1157 matched->bv_val = NA->e->e_name.bv_val +
1158 NA->e->e_name.bv_len - matched->bv_len;
1163 return LDAP_NO_SUCH_OBJECT;
1177 struct ndb_info *ni = (struct ndb_info *) be->be_private;
1178 const NdbDictionary::Dictionary *myDict = NA->ndb->getDictionary();
1179 const NdbDictionary::Table *myTable = myDict->getTable( DN2ID_TABLE );
1182 myop = NA->txn->getNdbOperation( myTable );
1185 if ( myop->deleteTuple())
1188 if ( ndb_rdns2keys( myop, NA->rdns ))
1191 /* Let caller invoke the roundtrip */
1192 /* return txn->execute(NoCommit); */
1203 struct ndb_info *ni = (struct ndb_info *) be->be_private;
1204 const NdbDictionary::Dictionary *myDict = ndb->getDictionary();
1205 const NdbDictionary::Table *myTable = myDict->getTable( NEXTID_TABLE );
1210 Debug( LDAP_DEBUG_ANY, "ndb_next_id: " NEXTID_TABLE " table is missing\n",
1215 rc = ndb->getAutoIncrementValue( myTable, nid, 1000 );
1221 extern "C" { static void ndb_thread_hfree( void *key, void *data ); };
1223 ndb_thread_hfree( void *key, void *data )
1225 Ndb *ndb = (Ndb *)data;
1234 struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1237 if ( ldap_pvt_thread_pool_getkey( op->o_threadctx, ni, &data, NULL )) {
1240 ldap_pvt_thread_mutex_lock( &ni->ni_conn_mutex );
1241 myNdb = new Ndb( ni->ni_cluster[ni->ni_nextconn++], ni->ni_dbname );
1242 if ( ni->ni_nextconn >= ni->ni_nconns )
1243 ni->ni_nextconn = 0;
1244 ldap_pvt_thread_mutex_unlock( &ni->ni_conn_mutex );
1248 rc = myNdb->init(1024);
1251 Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1255 data = (void *)myNdb;
1256 if (( rc = ldap_pvt_thread_pool_setkey( op->o_threadctx, ni,
1257 data, ndb_thread_hfree, NULL, NULL ))) {
1259 Debug( LDAP_DEBUG_ANY, "ndb_thread_handle: err %d\n",
1273 AttributeDescription *ad,
1277 struct ndb_info *ni = (struct ndb_info *) op->o_bd->be_private;
1282 /* Get our NDB handle */
1283 rc = ndb_thread_handle( op, &NA.ndb );
1285 NA.txn = NA.ndb->startTransaction();
1287 Debug( LDAP_DEBUG_TRACE,
1288 LDAP_XSTRING(ndb_entry_get) ": startTransaction failed: %s (%d)\n",
1289 NA.ndb->getNdbError().message, NA.ndb->getNdbError().code, 0 );
1301 rc = ndb_entry_get_info( op->o_bd, &NA, rw, NULL );
1306 rc = ndb_entry_get_data( op->o_bd, &NA, 0 );
1307 ber_bvarray_free( NA.ocs );
1309 if ( oc && !is_entry_objectclass_or_sub( &e, oc )) {
1310 attrs_free( e.e_attrs );
1316 *ent = entry_alloc();
1318 ber_dupbv( &(*ent)->e_name, ndn );
1319 ber_dupbv( &(*ent)->e_nname, ndn );
1327 /* Congestion avoidance code
1328 * for Deadlock Rollback
1332 ndb_trans_backoff( int num_retries )
1336 int pow_retries = 1;
1337 unsigned long key = 0;
1338 unsigned long max_key = -1;
1339 struct timeval timeout;
1341 lutil_entropy( (unsigned char *) &key, sizeof( unsigned long ));
1343 for ( i = 0; i < num_retries; i++ ) {
1344 if ( i >= 5 ) break;
1348 delay = 16384 * (key * (double) pow_retries / (double) max_key);
1349 delay = delay ? delay : 1;
1351 Debug( LDAP_DEBUG_TRACE, "delay = %d, num_retries = %d\n", delay, num_retries, 0 );
1353 timeout.tv_sec = delay / 1000000;
1354 timeout.tv_usec = delay % 1000000;
1355 select( 0, NULL, NULL, NULL, &timeout );