]> git.sur5r.net Git - openldap/blob - servers/slapd/connection.c
EXPERIMENTAL: move slapd_remove to connections_read/_write as they
[openldap] / servers / slapd / connection.c
1 #include "portable.h"
2
3 #include <stdio.h>
4
5 #include <ac/socket.h>
6 #include <ac/errno.h>
7 #include <ac/signal.h>
8 #include <ac/string.h>
9 #include <ac/time.h>
10
11 #include "slap.h"
12
13 /* we need LBER internals */
14 #include "../../libraries/liblber/lber-int.h"
15
16 /* protected by connections_mutex */
17 static ldap_pvt_thread_mutex_t connections_mutex;
18 static Connection *connections = NULL;
19 static long conn_nextid = 0;
20
21 /* structure state (protected by connections_mutex) */
22 #define SLAP_C_UNINITIALIZED    0x00    /* MUST BE ZERO (0) */
23 #define SLAP_C_UNUSED                   0x01
24 #define SLAP_C_USED                             0x02
25
26 /* connection state (protected by c_mutex ) */
27 #define SLAP_C_INVALID                  0x00    /* MUST BE ZERO (0) */
28 #define SLAP_C_INACTIVE                 0x01    /* zero threads */
29 #define SLAP_C_ACTIVE                   0x02    /* one or more threads */
30 #define SLAP_C_BINDING                  0x03    /* binding */
31 #define SLAP_C_CLOSING                  0x04    /* closing */
32
33 static Connection* connection_get( int s );
34
35 static int connection_input( Connection *c );
36 static void connection_close( Connection *c );
37
38 static int connection_op_activate( Connection *conn, Operation *op );
39 static int connection_resched( Connection *conn );
40
41 struct co_arg {
42         Connection      *co_conn;
43         Operation       *co_op;
44 };
45
46 /*
47  * Initialize connection management infrastructure.
48  */
49 int connections_init(void)
50 {
51         assert( connections == NULL );
52
53         if( connections != NULL) {
54                 Debug( LDAP_DEBUG_ANY, "connections_init: already initialized.\n",
55                         0, 0, 0 );
56                 return -1;
57         }
58
59         /* should check return of every call */
60         ldap_pvt_thread_mutex_init( &connections_mutex );
61
62         connections = (Connection *) calloc( dtblsize, sizeof(Connection) );
63
64         if( connections == NULL ) {
65                 Debug( LDAP_DEBUG_ANY,
66                         "connections_init: allocation (%d*%ld) of connection array failed.\n",
67                         dtblsize, (long) sizeof(Connection), 0 );
68                 return -1;
69         }
70
71     assert( connections[0].c_struct_state == SLAP_C_UNINITIALIZED );
72     assert( connections[dtblsize-1].c_struct_state == SLAP_C_UNINITIALIZED );
73
74         /*
75          * per entry initialization of the Connection array initialization
76          * will be done by connection_init()
77          */ 
78
79         return 0;
80 }
81
82 /*
83  * Destroy connection management infrastructure.
84  */
85 int connections_destroy(void)
86 {
87         int i;
88
89         /* should check return of every call */
90
91         if( connections == NULL) {
92                 Debug( LDAP_DEBUG_ANY, "connections_destroy: nothing to destroy.\n",
93                         0, 0, 0 );
94                 return -1;
95         }
96
97         for ( i = 0; i < dtblsize; i++ ) {
98                 if( connections[i].c_struct_state != SLAP_C_UNINITIALIZED ) {
99                         ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
100                         ldap_pvt_thread_mutex_destroy( &connections[i].c_write_mutex );
101                         ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv );
102                 }
103         }
104
105         free( connections );
106         connections = NULL;
107
108         ldap_pvt_thread_mutex_destroy( &connections_mutex );
109         return 0;
110 }
111
112 /*
113  * shutdown all connections
114  */
115 int connections_shutdown(void)
116 {
117         int i;
118
119         ldap_pvt_thread_mutex_lock( &connections_mutex );
120
121         for ( i = 0; i < dtblsize; i++ ) {
122                 if( connections[i].c_struct_state != SLAP_C_USED ) {
123                         continue;
124                 }
125
126                 ldap_pvt_thread_mutex_lock( &connections[i].c_mutex );
127
128                 /* connections_mutex and c_mutex are locked */
129                 connection_closing( &connections[i] );
130                 connection_close( &connections[i] );
131
132                 ldap_pvt_thread_mutex_unlock( &connections[i].c_mutex );
133         }
134
135         ldap_pvt_thread_mutex_unlock( &connections_mutex );
136
137         return 0;
138 }
139
140 static Connection* connection_get( int s )
141 {
142         /* connections_mutex should be locked by caller */
143
144         Connection *c;
145
146         Debug( LDAP_DEBUG_ARGS,
147                 "connection_get(%d)\n",
148                 s, 0, 0 );
149
150         assert( connections != NULL );
151
152         if(s < 0) {
153                 return NULL;
154         }
155
156 #ifndef HAVE_WINSOCK
157         c = &connections[s];
158
159         assert( c->c_struct_state == SLAP_C_USED );
160
161         ldap_pvt_thread_mutex_lock( &c->c_mutex );
162
163         assert( c->c_conn_state != SLAP_C_INVALID );
164         assert( ber_pvt_sb_in_use( c->c_sb ) );
165
166 #else
167         c = NULL;
168         {
169                 int i;
170
171                 for(i=0; i<dtblsize; i++) {
172                         if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
173                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
174                                 assert( connections[i].c_sb == 0 );
175                                 break;
176                         }
177
178                         if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
179                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
180                                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ) );
181                                 continue;
182                         }
183
184                         assert( connections[i].c_struct_state == SLAP_C_USED );
185                         assert( connections[i].c_conn_state != SLAP_C_INVALID );
186                         assert( ber_pvt_sb_in_use( connections[i].c_sb ) );
187
188                         if( ber_pvt_sb_get_desc( connections[i].c_sb ) == s ) {
189                                 c = &connections[i];
190                                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
191                                 break;
192                         }
193                 }
194         }
195 #endif
196
197         if( c != NULL ) {
198                 /* we do this AFTER locking to aid in debugging */
199                 Debug( LDAP_DEBUG_TRACE,
200                         "connection_get(%d): got connid=%ld\n",
201                         s, c->c_connid, 0 );
202         }
203         return c;
204 }
205
206 static void connection_return( Connection *c )
207 {
208         ldap_pvt_thread_mutex_unlock( &c->c_mutex );
209 }
210
211 long connection_init(
212         int s,
213         const char* name,
214         const char* addr)
215 {
216         long id;
217         Connection *c;
218         assert( connections != NULL );
219
220         if( s < 0 ) {
221         Debug( LDAP_DEBUG_ANY,
222                         "connection_init(%d): invalid.\n",
223                         s, 0, 0 );
224                 return -1;
225         }
226
227         assert( s >= 0 );
228 #ifndef HAVE_WINSOCK
229         assert( s < dtblsize );
230 #endif
231
232         ldap_pvt_thread_mutex_lock( &connections_mutex );
233
234 #ifndef HAVE_WINSOCK
235         c = &connections[s];
236
237 #else
238         {
239                 int i;
240
241                 c = NULL;
242
243         for( i=0; i < dtblsize; i++) {
244             if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
245                 assert( connections[i].c_sb == 0 );
246                 c = &connections[i];
247                 break;
248             }
249
250             if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
251                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ));
252                 c = &connections[i];
253                 break;
254             }
255
256             assert( connections[i].c_struct_state == SLAP_C_USED );
257             assert( connections[i].c_conn_state != SLAP_C_INVALID );
258             assert( ber_pvt_sb_in_use( connections[i].c_sb ));
259         }
260
261         if( c == NULL ) {
262                 Debug( LDAP_DEBUG_ANY,
263                                 "connection_init(%d): connection table full (%d/%d).\n",
264                                 s, i, dtblsize);
265             ldap_pvt_thread_mutex_unlock( &connections_mutex );
266             return -1;
267         }
268     }
269 #endif
270
271     assert( c != NULL );
272     assert( c->c_struct_state != SLAP_C_USED );
273     assert( c->c_conn_state == SLAP_C_INVALID );
274
275     if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
276         c->c_dn = NULL;
277         c->c_cdn = NULL;
278         c->c_client_name = NULL;
279         c->c_client_addr = NULL;
280         c->c_ops = NULL;
281         c->c_pending_ops = NULL;
282
283         c->c_sb = ber_sockbuf_alloc( );
284
285         /* should check status of thread calls */
286         ldap_pvt_thread_mutex_init( &c->c_mutex );
287         ldap_pvt_thread_mutex_init( &c->c_write_mutex );
288         ldap_pvt_thread_cond_init( &c->c_write_cv );
289
290         c->c_struct_state = SLAP_C_UNUSED;
291     }
292
293     ldap_pvt_thread_mutex_lock( &c->c_mutex );
294
295     assert( c->c_struct_state == SLAP_C_UNUSED );
296     assert(     c->c_dn == NULL );
297     assert(     c->c_cdn == NULL );
298     assert( c->c_client_name == NULL );
299     assert( c->c_client_addr == NULL );
300     assert( c->c_ops == NULL );
301     assert( c->c_pending_ops == NULL );
302
303     c->c_client_name = ch_strdup( name == NULL ? "" : name );
304     c->c_client_addr = ch_strdup( addr );
305
306     c->c_n_ops_received = 0;
307 #ifdef LDAP_COUNTERS
308     c->c_n_ops_executing = 0;
309     c->c_n_ops_pending = 0;
310     c->c_n_ops_completed = 0;
311 #endif
312
313     c->c_starttime = slap_get_time();
314
315     ber_pvt_sb_set_desc( c->c_sb, s );
316     ber_pvt_sb_set_io( c->c_sb, &ber_pvt_sb_io_tcp, NULL );
317
318     if( ber_pvt_sb_set_nonblock( c->c_sb, 1 ) < 0 ) {
319         Debug( LDAP_DEBUG_ANY,
320             "connection_init(%d, %s, %s): set nonblocking failed\n",
321             s, c->c_client_name, c->c_client_addr);
322     }
323
324     id = c->c_connid = conn_nextid++;
325
326     c->c_conn_state = SLAP_C_INACTIVE;
327     c->c_struct_state = SLAP_C_USED;
328
329     ldap_pvt_thread_mutex_unlock( &c->c_mutex );
330     ldap_pvt_thread_mutex_unlock( &connections_mutex );
331
332     return id;
333 }
334
335 static void
336 connection_destroy( Connection *c )
337 {
338         /* note: connections_mutex should be locked by caller */
339
340     assert( connections != NULL );
341     assert( c != NULL );
342     assert( c->c_struct_state != SLAP_C_UNUSED );
343     assert( c->c_conn_state != SLAP_C_INVALID );
344     assert( c->c_ops == NULL );
345
346     c->c_struct_state = SLAP_C_UNUSED;
347     c->c_conn_state = SLAP_C_INVALID;
348
349 #ifdef LDAP_COMPAT30
350     c->c_version = 0;
351 #endif
352     c->c_protocol = 0;
353
354     c->c_starttime = 0;
355
356     if(c->c_dn != NULL) {
357         free(c->c_dn);
358         c->c_dn = NULL;
359     }
360         if(c->c_cdn != NULL) {
361                 free(c->c_cdn);
362                 c->c_cdn = NULL;
363         }
364         if(c->c_client_name != NULL) {
365                 free(c->c_client_name);
366                 c->c_client_name = NULL;
367         }
368         if(c->c_client_addr != NULL) {
369                 free(c->c_client_addr);
370                 c->c_client_addr = NULL;
371         }
372
373         if ( ber_pvt_sb_in_use(c->c_sb) ) {
374                 int sd = ber_pvt_sb_get_desc(c->c_sb);
375
376                 slapd_remove( sd, 0 );
377                 ber_pvt_sb_close( c->c_sb );
378
379                 Statslog( LDAP_DEBUG_STATS,
380                     "conn=%d fd=%d closed.\n",
381                         c->c_connid, sd, 0, 0, 0 );
382         }
383
384         ber_pvt_sb_destroy( c->c_sb );
385 }
386
387 int connection_state_closing( Connection *c )
388 {
389         /* c_mutex must be locked by caller */
390
391         int state;
392         assert( c != NULL );
393         assert( c->c_struct_state == SLAP_C_USED );
394
395         state = c->c_conn_state;
396
397         assert( state != SLAP_C_INVALID );
398
399         return state == SLAP_C_CLOSING;
400 }
401
402 void connection_closing( Connection *c )
403 {
404         assert( connections != NULL );
405         assert( c != NULL );
406         assert( c->c_struct_state == SLAP_C_USED );
407         assert( c->c_conn_state != SLAP_C_INVALID );
408
409         /* c_mutex must be locked by caller */
410
411         if( c->c_conn_state != SLAP_C_CLOSING ) {
412                 Operation *o;
413
414                 Debug( LDAP_DEBUG_TRACE,
415                         "connection_closing: readying conn=%ld sd=%d for close.\n",
416                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
417
418                 /* update state to closing */
419                 c->c_conn_state = SLAP_C_CLOSING;
420
421                 /* don't listen on this port anymore */
422                 slapd_clr_read( ber_pvt_sb_get_desc( c->c_sb ), 1 );
423
424                 /* shutdown I/O -- not yet implemented */
425
426                 /* abandon active operations */
427                 for( o = c->c_ops; o != NULL; o = o->o_next ) {
428                         ldap_pvt_thread_mutex_lock( &o->o_abandonmutex );
429                         o->o_abandon = 1;
430                         ldap_pvt_thread_mutex_unlock( &o->o_abandonmutex );
431                 }
432
433                 /* remove pending operations */
434                 for( o = slap_op_pop( &c->c_pending_ops );
435                         o != NULL;
436                         o = slap_op_pop( &c->c_pending_ops ) )
437                 {
438                         slap_op_free( o );
439                 }
440
441                 /* wake write blocked operations */
442                 slapd_clr_write( ber_pvt_sb_get_desc(c->c_sb), 1 );
443                 ldap_pvt_thread_cond_signal( &c->c_write_cv );
444         }
445 }
446
447 static void connection_close( Connection *c )
448 {
449         assert( connections != NULL );
450         assert( c != NULL );
451         assert( c->c_struct_state == SLAP_C_USED );
452         assert( c->c_conn_state == SLAP_C_CLOSING );
453
454         /* note: connections_mutex and c_mutex should be locked by caller */
455
456         if( c->c_ops != NULL ) {
457                 Debug( LDAP_DEBUG_TRACE,
458                         "connection_close: deferring conn=%ld sd=%d.\n",
459                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
460
461                 return;
462         }
463
464         Debug( LDAP_DEBUG_TRACE, "connection_close: conn=%ld sd=%d.\n",
465                 c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
466
467         connection_destroy( c );
468 }
469
470 long connections_nextid(void)
471 {
472         long id;
473         assert( connections != NULL );
474
475         ldap_pvt_thread_mutex_lock( &connections_mutex );
476
477         id = conn_nextid;
478
479         ldap_pvt_thread_mutex_unlock( &connections_mutex );
480
481         return id;
482 }
483
484 Connection* connection_first( int *index )
485 {
486         assert( connections != NULL );
487         assert( index != NULL );
488
489         ldap_pvt_thread_mutex_lock( &connections_mutex );
490
491         *index = 0;
492
493         return connection_next(NULL, index);
494 }
495
496 Connection* connection_next( Connection *c, int *index )
497 {
498         assert( connections != NULL );
499         assert( index != NULL );
500         assert( *index <= dtblsize );
501
502         if( c != NULL ) {
503                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
504         }
505
506         c = NULL;
507
508         for(; *index < dtblsize; (*index)++) {
509                 if( connections[*index].c_struct_state == SLAP_C_UNINITIALIZED ) {
510                         assert( connections[*index].c_conn_state == SLAP_C_INVALID );
511 #ifndef HAVE_WINSOCK
512                         continue;
513 #else
514                         break;
515 #endif
516                 }
517
518                 if( connections[*index].c_struct_state == SLAP_C_USED ) {
519                         assert( connections[*index].c_conn_state != SLAP_C_INVALID );
520                         c = &connections[(*index)++];
521                         break;
522                 }
523
524                 assert( connections[*index].c_struct_state == SLAP_C_UNUSED );
525                 assert( connections[*index].c_conn_state == SLAP_C_INVALID );
526         }
527
528         if( c != NULL ) {
529                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
530         }
531
532         return c;
533 }
534
535 void connection_done( Connection *c )
536 {
537         assert( connections != NULL );
538
539         if( c != NULL ) {
540                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
541         }
542
543         ldap_pvt_thread_mutex_unlock( &connections_mutex );
544 }
545
546 /*
547  * connection_activity - handle the request operation op on connection
548  * conn.  This routine figures out what kind of operation it is and
549  * calls the appropriate stub to handle it.
550  */
551
552 static void *
553 connection_operation( void *arg_v )
554 {
555         struct co_arg   *arg = arg_v;
556         int tag = arg->co_op->o_tag;
557         Connection *conn = arg->co_conn;
558
559 #ifdef LDAP_COUNTERS
560         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
561         num_ops_initiated++;
562         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
563 #endif
564
565         switch ( tag ) {
566         case LDAP_REQ_BIND:
567                 do_bind( conn, arg->co_op );
568                 break;
569
570 #ifdef LDAP_COMPAT30
571         case LDAP_REQ_UNBIND_30:
572 #endif
573         case LDAP_REQ_UNBIND:
574                 do_unbind( conn, arg->co_op );
575                 break;
576
577         case LDAP_REQ_ADD:
578                 do_add( conn, arg->co_op );
579                 break;
580
581 #ifdef LDAP_COMPAT30
582         case LDAP_REQ_DELETE_30:
583 #endif
584         case LDAP_REQ_DELETE:
585                 do_delete( conn, arg->co_op );
586                 break;
587
588         case LDAP_REQ_MODRDN:
589                 do_modrdn( conn, arg->co_op );
590                 break;
591
592         case LDAP_REQ_MODIFY:
593                 do_modify( conn, arg->co_op );
594                 break;
595
596         case LDAP_REQ_COMPARE:
597                 do_compare( conn, arg->co_op );
598                 break;
599
600         case LDAP_REQ_SEARCH:
601                 do_search( conn, arg->co_op );
602                 break;
603
604 #ifdef LDAP_COMPAT30
605         case LDAP_REQ_ABANDON_30:
606 #endif
607         case LDAP_REQ_ABANDON:
608                 do_abandon( conn, arg->co_op );
609                 break;
610
611         default:
612                 Debug( LDAP_DEBUG_ANY, "unknown request 0x%lx\n",
613                     arg->co_op->o_tag, 0, 0 );
614                 break;
615         }
616
617 #ifdef LDAP_COUNTERS
618         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
619         num_ops_completed++;
620         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
621 #endif
622
623         ldap_pvt_thread_mutex_lock( &conn->c_mutex );
624
625 #ifdef LDAP_COUNTERS
626         conn->c_n_ops_completed++;
627 #endif
628
629         slap_op_remove( &conn->c_ops, arg->co_op );
630         slap_op_free( arg->co_op );
631         arg->co_op = NULL;
632         arg->co_conn = NULL;
633         free( (char *) arg );
634         arg = NULL;
635
636         switch( tag ) {
637 #ifdef LDAP_COMPAT30
638         case LDAP_REQ_UNBIND_30:
639 #endif
640         case LDAP_REQ_UNBIND:
641                 /* c_mutex is locked */
642                 connection_closing( conn );
643                 break;
644
645         case LDAP_REQ_BIND:
646                 if( conn->c_conn_state == SLAP_C_BINDING) {
647                         conn->c_conn_state = SLAP_C_ACTIVE;
648                 }
649         }
650
651         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
652         active_threads--;
653         if( active_threads < 1 ) {
654                 ldap_pvt_thread_cond_signal(&active_threads_cond);
655         }
656         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
657
658         connection_resched( conn );
659
660         ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
661
662         return NULL;
663 }
664
665 int connection_read(int s)
666 {
667         int rc = 0;
668         Connection *c;
669         assert( connections != NULL );
670
671         ldap_pvt_thread_mutex_lock( &connections_mutex );
672
673         /* get (locked) connection */
674         c = connection_get( s );
675
676         if( c == NULL ) {
677                 Debug( LDAP_DEBUG_ANY,
678                         "connection_read(%d): no connection!\n",
679                         s, 0, 0 );
680
681                 slapd_remove(s, 0);
682
683                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
684                 return -1;
685         }
686
687         if( c->c_conn_state == SLAP_C_CLOSING ) {
688                 Debug( LDAP_DEBUG_TRACE,
689                         "connection_read(%d): closing, ignoring input for id=%ld\n",
690                         s, c->c_connid, 0 );
691
692                 connection_return( c );
693                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
694                 return 0;
695         }
696
697         Debug( LDAP_DEBUG_TRACE,
698                 "connection_read(%d): checking for input on id=%ld\n",
699                 s, c->c_connid, 0 );
700
701 #define CONNECTION_INPUT_LOOP 1
702
703 #ifdef DATA_READY_LOOP
704         while(!rc && ber_pvt_sb_data_ready(&c->c_sb))
705 #elif CONNECTION_INPUT_LOOP
706         while(!rc)
707 #endif
708         {
709                 rc = connection_input( c );
710         }
711
712         if( rc < 0 ) {
713                 Debug( LDAP_DEBUG_TRACE,
714                         "connection_read(%d): input error=%d id=%ld, closing.\n",
715                         s, rc, c->c_connid );
716
717                 /* connections_mutex and c_mutex are locked */
718                 connection_closing( c );
719                 connection_close( c );
720         }
721
722         connection_return( c );
723         ldap_pvt_thread_mutex_unlock( &connections_mutex );
724         return 0;
725 }
726
727 static int
728 connection_input(
729     Connection *conn
730 )
731 {
732         Operation *op;
733         unsigned long   tag, len;
734         long            msgid;
735         BerElement      *ber;
736
737         if ( conn->c_currentber == NULL && (conn->c_currentber = ber_alloc())
738             == NULL ) {
739                 Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n", 0, 0, 0 );
740                 return -1;
741         }
742
743         errno = 0;
744         if ( (tag = ber_get_next( conn->c_sb, &len, conn->c_currentber ))
745             != LDAP_TAG_MESSAGE )
746         {
747                 int err = errno;
748
749                 Debug( LDAP_DEBUG_TRACE,
750                         "ber_get_next on fd %d failed errno %d (%s)\n",
751                         ber_pvt_sb_get_desc( conn->c_sb ), err,
752                         err > -1 && err < sys_nerr ?  sys_errlist[err] : "unknown" );
753                 Debug( LDAP_DEBUG_TRACE,
754                         "\t*** got %ld of %lu so far\n",
755                         (long)(conn->c_currentber->ber_rwptr - conn->c_currentber->ber_buf),
756                         conn->c_currentber->ber_len, 0 );
757
758                 if ( err != EWOULDBLOCK && err != EAGAIN ) {
759                         /* log, close and send error */
760                         ber_free( conn->c_currentber, 1 );
761                         conn->c_currentber = NULL;
762
763                         return -2;
764                 }
765                 return 1;
766         }
767
768         ber = conn->c_currentber;
769         conn->c_currentber = NULL;
770
771         if ( (tag = ber_get_int( ber, &msgid )) != LDAP_TAG_MSGID ) {
772                 /* log, close and send error */
773                 Debug( LDAP_DEBUG_ANY, "ber_get_int returns 0x%lx\n", tag, 0,
774                     0 );
775                 ber_free( ber, 1 );
776                 return -1;
777         }
778
779         if ( (tag = ber_peek_tag( ber, &len )) == LBER_ERROR ) {
780                 /* log, close and send error */
781                 Debug( LDAP_DEBUG_ANY, "ber_peek_tag returns 0x%lx\n", tag, 0,
782                     0 );
783                 ber_free( ber, 1 );
784
785                 return -1;
786         }
787
788 #ifdef LDAP_COMPAT30
789         if ( conn->c_version == 30 ) {
790                 (void) ber_skip_tag( ber, &len );
791         }
792 #endif
793
794         op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++ );
795
796         if ( conn->c_conn_state == SLAP_C_BINDING
797                 || conn->c_conn_state == SLAP_C_CLOSING )
798         {
799                 Debug( LDAP_DEBUG_ANY, "deferring operation\n", 0, 0, 0 );
800                 slap_op_add( &conn->c_pending_ops, op );
801
802         } else {
803                 connection_op_activate( conn, op );
804         }
805
806 #ifdef NO_THREADS
807         if ( conn->c_struct_state != SLAP_C_USED ) {
808                 /* connection must have got closed underneath us */
809                 return 1;
810         }
811 #endif
812         assert( conn->c_struct_state == SLAP_C_USED );
813
814         return 0;
815 }
816
817 static int
818 connection_resched( Connection *conn )
819 {
820         Operation *op;
821
822         if( conn->c_conn_state == SLAP_C_CLOSING ) {
823                 Debug( LDAP_DEBUG_TRACE,
824                         "connection_resched: attempting closing conn=%ld sd=%d.\n",
825                         conn->c_connid, ber_pvt_sb_get_desc( conn->c_sb ), 0 );
826
827                 connection_close( conn );
828                 return 0;
829         }
830
831         if( conn->c_conn_state != SLAP_C_ACTIVE ) {
832                 /* other states need different handling */
833                 return 0;
834         }
835
836         for( op = slap_op_pop( &conn->c_pending_ops );
837                 op != NULL;
838                 op = slap_op_pop( &conn->c_pending_ops ) )
839         {
840                 /* pending operations should not be marked for abandonment */
841                 assert(!op->o_abandon);
842
843                 connection_op_activate( conn, op );
844
845                 if ( conn->c_conn_state == SLAP_C_BINDING ) {
846                         break;
847                 }
848         }
849         return 0;
850 }
851
852 static int connection_op_activate( Connection *conn, Operation *op )
853 {
854         struct co_arg *arg;
855         char *tmpdn;
856         int status;
857         unsigned long tag = op->o_tag;
858
859         if ( conn->c_dn != NULL ) {
860                 tmpdn = ch_strdup( conn->c_dn );
861         } else {
862                 tmpdn = NULL;
863         }
864
865         arg = (struct co_arg *) ch_malloc( sizeof(struct co_arg) );
866         arg->co_conn = conn;
867         arg->co_op = op;
868
869         arg->co_op->o_dn = ch_strdup( tmpdn != NULL ? tmpdn : "" );
870         arg->co_op->o_ndn = dn_normalize_case( ch_strdup( arg->co_op->o_dn ) );
871
872         slap_op_add( &conn->c_ops, arg->co_op );
873
874         if(tag == LDAP_REQ_BIND) {
875                 conn->c_conn_state = SLAP_C_BINDING;
876         }
877
878         if ( tmpdn != NULL ) {
879                 free( tmpdn );
880         }
881
882         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
883         active_threads++;
884         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
885
886         status = ldap_pvt_thread_create( &arg->co_op->o_tid, 1,
887                                          connection_operation, (void *) arg );
888
889         if ( status != 0 ) {
890                 Debug( LDAP_DEBUG_ANY,
891                 "ldap_pvt_thread_create failed (%d)\n", status, 0, 0 );
892
893                 /* should move op to pending list */
894         }
895
896         return status;
897 }
898
899 int connection_write(int s)
900 {
901         Connection *c;
902         assert( connections != NULL );
903
904         ldap_pvt_thread_mutex_lock( &connections_mutex );
905
906         c = connection_get( s );
907
908         slapd_clr_write( s, 0);
909
910         if( c == NULL ) {
911                 Debug( LDAP_DEBUG_ANY,
912                         "connection_write(%d): no connection!\n",
913                         s, 0, 0 );
914                 slapd_remove(s, 0);
915                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
916                 return -1;
917         }
918
919         Debug( LDAP_DEBUG_TRACE,
920                 "connection_write(%d): waking output for id=%ld\n",
921                 s, c->c_connid, 0 );
922
923         ldap_pvt_thread_cond_signal( &c->c_write_cv );
924
925         connection_return( c );
926         ldap_pvt_thread_mutex_unlock( &connections_mutex );
927         return 0;
928 }