]> git.sur5r.net Git - openldap/blob - servers/slapd/connection.c
4d8d339e6e1995b3885a373aad8524b880d30fb9
[openldap] / servers / slapd / connection.c
1 #include "portable.h"
2
3 #include <stdio.h>
4
5 #include <ac/socket.h>
6 #include <ac/errno.h>
7 #include <ac/signal.h>
8 #include <ac/string.h>
9 #include <ac/time.h>
10
11 #include "slap.h"
12
13 /* we need LBER internals */
14 #include "../../libraries/liblber/lber-int.h"
15
16 /* protected by connections_mutex */
17 static ldap_pvt_thread_mutex_t connections_mutex;
18 static Connection *connections = NULL;
19 static int conn_index = -1;
20 static long conn_nextid = 0;
21
22 /* structure state (protected by connections_mutex) */
23 #define SLAP_C_UNINITIALIZED    0x0     /* MUST BE ZERO (0) */
24 #define SLAP_C_UNUSED                   0x1
25 #define SLAP_C_USED                             0x2
26
27 /* connection state (protected by c_mutex ) */
28 #define SLAP_C_INVALID                  0x0     /* MUST BE ZERO (0) */
29 #define SLAP_C_INACTIVE                 0x1     /* zero threads */
30 #define SLAP_C_ACTIVE                   0x2 /* one or more threads */
31 #define SLAP_C_BINDING                  0x3     /* binding */
32 #define SLAP_C_CLOSING                  0x4     /* closing */
33
34 void slapd_remove(int s);
35 static Connection* connection_get( int s );
36
37 static int connection_input( Connection *c );
38 static void connection_close( Connection *c );
39
40 static int connection_op_activate( Connection *conn, Operation *op );
41 static int connection_resched( Connection *conn );
42
43 struct co_arg {
44         Connection      *co_conn;
45         Operation       *co_op;
46 };
47
48 /*
49  * Initialize connection management infrastructure.
50  */
51 int connections_init(void)
52 {
53         int i;
54
55         assert( connections == NULL );
56
57         if( connections != NULL) {
58                 Debug( LDAP_DEBUG_ANY, "connections_init: already initialized.\n",
59                         0, 0, 0 );
60                 return -1;
61         }
62
63         /* should check return of every call */
64         ldap_pvt_thread_mutex_init( &connections_mutex );
65
66         connections = (Connection *) calloc( dtblsize, sizeof(Connection) );
67
68         if( connections == NULL ) {
69                 Debug( LDAP_DEBUG_ANY,
70                         "connections_init: allocation (%d*%ld) of connection array failed.\n",
71                         dtblsize, (long) sizeof(Connection), 0 );
72                 return -1;
73         }
74
75         for ( i = 0; i < dtblsize; i++ )
76                 memset( &connections[i], 0, sizeof(Connection) );
77
78         /*
79          * per entry initialization of the Connection array initialization
80          * will be done by connection_init()
81          */ 
82
83         return 0;
84 }
85
86 /*
87  * Destroy connection management infrastructure.
88  */
89 int connections_destroy(void)
90 {
91         int i;
92
93         /* should check return of every call */
94
95         if( connections == NULL) {
96                 Debug( LDAP_DEBUG_ANY, "connections_destroy: nothing to destroy.\n",
97                         0, 0, 0 );
98                 return -1;
99         }
100
101         for ( i = 0; i < dtblsize; i++ ) {
102                 if( connections[i].c_struct_state != SLAP_C_UNINITIALIZED ) {
103                         ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
104                         ldap_pvt_thread_mutex_destroy( &connections[i].c_write_mutex );
105                         ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv );
106                 }
107         }
108
109         free( connections );
110         connections = NULL;
111
112         ldap_pvt_thread_mutex_destroy( &connections_mutex );
113         return 0;
114 }
115
116 /*
117  * shutdown all connections
118  */
119 int connections_shutdown(void)
120 {
121         int i;
122
123         ldap_pvt_thread_mutex_lock( &connections_mutex );
124
125         for ( i = 0; i < dtblsize; i++ ) {
126                 if( connections[i].c_struct_state != SLAP_C_USED ) {
127                         continue;
128                 }
129
130                 ldap_pvt_thread_mutex_lock( &connections[i].c_mutex );
131                 connection_closing( &connections[i] );
132                 connection_close( &connections[i] );
133                 ldap_pvt_thread_mutex_unlock( &connections[i].c_mutex );
134         }
135
136         ldap_pvt_thread_mutex_unlock( &connections_mutex );
137
138         return 0;
139 }
140
141 static Connection* connection_get( int s )
142 {
143         Connection *c = NULL;
144
145         assert( connections != NULL );
146
147         if(s < 0) {
148                 return NULL;
149         }
150
151 #ifndef HAVE_WINSOCK
152         assert( connections[s].c_struct_state == SLAP_C_USED );
153         assert( connections[s].c_conn_state != SLAP_C_INVALID );
154         assert( ber_pvt_sb_in_use( connections[s].c_sb ) );
155
156         c = &connections[s];
157 #else
158         {
159                 int i;
160
161                 for(i=0; i<dtblsize; i++) {
162                         if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
163                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
164                                 assert( connections[i].c_sb == 0 );
165                                 break;
166                         }
167
168                         if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
169                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
170                                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ) );
171                                 continue;
172                         }
173
174                         assert( connections[i].c_struct_state == SLAP_C_USED );
175                         assert( connections[i].c_conn_state != SLAP_C_INVALID );
176                         assert( ber_pvt_sb_in_use( connections[i].c_sb ) );
177
178                         if( ber_pvt_sb_get_desc( connections[i].c_sb ) == s ) {
179                                 c = &connections[i];
180                                 break;
181                         }
182                 }
183         }
184 #endif
185
186         if( c != NULL ) {
187                 /* we do this BEFORE locking to aid in debugging */
188                 Debug( LDAP_DEBUG_TRACE,
189                         "connection_get(%d): got connid=%ld\n",
190                         s, c->c_connid, 0 );
191
192                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
193         }
194         return c;
195 }
196
197 static void connection_return( Connection *c )
198 {
199         ldap_pvt_thread_mutex_unlock( &c->c_mutex );
200 }
201
202 long connection_init(
203         int s,
204         const char* name,
205         const char* addr)
206 {
207         long id;
208         Connection *c;
209         assert( connections != NULL );
210
211         if( s < 0 ) {
212                 return -1;
213         }
214
215         assert( s >= 0 );
216 #ifndef HAVE_WINSOCK
217         assert( s < dtblsize );
218 #endif
219
220         ldap_pvt_thread_mutex_lock( &connections_mutex );
221
222 #ifndef HAVE_WINSOCK
223         c = &connections[s];
224
225 #else
226         {
227                 int i;
228
229         for( i=0; i < dtblsize; i++) {
230             if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
231                 assert( connections[i].c_sb == 0 );
232                 c = &connections[i];
233                 break;
234             }
235
236             if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
237                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ));
238                 c = &connections[i];
239                 break;
240             }
241
242             assert( connections[i].c_struct_state == SLAP_C_USED );
243             assert( connections[i].c_conn_state != SLAP_C_INVALID );
244             assert( ber_pvt_sb_in_use( connections[i].c_sb ));
245         }
246
247         if( c == NULL ) {
248             ldap_pvt_thread_mutex_unlock( &connections_mutex );
249             return -1;
250         }
251     }
252 #endif
253
254     assert( c != NULL );
255     assert( c->c_struct_state != SLAP_C_USED );
256     assert( c->c_conn_state == SLAP_C_INVALID );
257
258     if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
259         c->c_dn = NULL;
260         c->c_cdn = NULL;
261         c->c_client_name = NULL;
262         c->c_client_addr = NULL;
263         c->c_ops = NULL;
264         c->c_pending_ops = NULL;
265
266         c->c_sb = ber_sockbuf_alloc( );
267
268         /* should check status of thread calls */
269         ldap_pvt_thread_mutex_init( &c->c_mutex );
270         ldap_pvt_thread_mutex_init( &c->c_write_mutex );
271         ldap_pvt_thread_cond_init( &c->c_write_cv );
272
273         c->c_struct_state = SLAP_C_UNUSED;
274     }
275
276     ldap_pvt_thread_mutex_lock( &c->c_mutex );
277
278     assert( c->c_struct_state == SLAP_C_UNUSED );
279     assert(     c->c_dn == NULL );
280     assert(     c->c_cdn == NULL );
281     assert( c->c_client_name == NULL );
282     assert( c->c_client_addr == NULL );
283     assert( c->c_ops == NULL );
284     assert( c->c_pending_ops == NULL );
285
286     c->c_client_name = ch_strdup( name == NULL ? "" : name );
287     c->c_client_addr = ch_strdup( addr );
288
289     c->c_n_ops_received = 0;
290 #ifdef LDAP_COUNTERS
291     c->c_n_ops_executing = 0;
292     c->c_n_ops_pending = 0;
293     c->c_n_ops_completed = 0;
294 #endif
295
296     c->c_starttime = slap_get_time();
297
298     ber_pvt_sb_set_desc( c->c_sb, s );
299     ber_pvt_sb_set_io( c->c_sb, &ber_pvt_sb_io_tcp, NULL );
300
301     if( ber_pvt_sb_set_nonblock( c->c_sb, 1 ) < 0 ) {
302         Debug( LDAP_DEBUG_ANY,
303             "connection_init(%d, %s, %s): set nonblocking failed\n",
304             s, c->c_client_name, c->c_client_addr);
305     }
306
307     id = c->c_connid = conn_nextid++;
308
309     c->c_conn_state = SLAP_C_INACTIVE;
310     c->c_struct_state = SLAP_C_USED;
311
312     ldap_pvt_thread_mutex_unlock( &c->c_mutex );
313     ldap_pvt_thread_mutex_unlock( &connections_mutex );
314
315     return id;
316 }
317
318 static void
319 connection_destroy( Connection *c )
320 {
321         /* note: connections_mutex should be locked by caller */
322
323     assert( connections != NULL );
324     assert( c != NULL );
325     assert( c->c_struct_state != SLAP_C_UNUSED );
326     assert( c->c_conn_state != SLAP_C_INVALID );
327     assert( c->c_ops == NULL );
328
329     c->c_struct_state = SLAP_C_UNUSED;
330     c->c_conn_state = SLAP_C_INVALID;
331
332 #ifdef LDAP_COMPAT30
333     c->c_version = 0;
334 #endif
335     c->c_protocol = 0;
336
337     c->c_starttime = 0;
338
339     if(c->c_dn != NULL) {
340         free(c->c_dn);
341         c->c_dn = NULL;
342     }
343         if(c->c_cdn != NULL) {
344                 free(c->c_cdn);
345                 c->c_cdn = NULL;
346         }
347         if(c->c_client_name != NULL) {
348                 free(c->c_client_name);
349                 c->c_client_name = NULL;
350         }
351         if(c->c_client_addr != NULL) {
352                 free(c->c_client_addr);
353                 c->c_client_addr = NULL;
354         }
355
356         if ( ber_pvt_sb_in_use(c->c_sb) ) {
357                 int sd = ber_pvt_sb_get_desc(c->c_sb);
358
359                 slapd_remove( sd );
360                 ber_pvt_sb_close( c->c_sb );
361
362                 Statslog( LDAP_DEBUG_STATS,
363                     "conn=%d fd=%d closed.\n",
364                         c->c_connid, sd, 0, 0, 0 );
365         }
366
367         ber_pvt_sb_destroy( c->c_sb );
368 }
369
370 int connection_state_closing( Connection *c )
371 {
372         /* connection must be locked by caller */
373         int state;
374         assert( c != NULL );
375         assert( c->c_struct_state == SLAP_C_USED );
376
377         state = c->c_conn_state;
378
379         assert( state != SLAP_C_INVALID );
380
381         return state == SLAP_C_CLOSING;
382 }
383
384 void connection_closing( Connection *c )
385 {
386         assert( connections != NULL );
387         assert( c != NULL );
388         assert( c->c_struct_state == SLAP_C_USED );
389         assert( c->c_conn_state != SLAP_C_INVALID );
390
391         if( c->c_conn_state != SLAP_C_CLOSING ) {
392                 Operation *o;
393
394                 Debug( LDAP_DEBUG_TRACE,
395                         "connection_closing: readying conn=%ld sd=%d for close.\n",
396                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
397
398                 /* update state to closing */
399                 c->c_conn_state = SLAP_C_CLOSING;
400
401                 /* don't listen on this port anymore */
402                 slapd_clr_read( ber_pvt_sb_get_desc( c->c_sb ), 1 );
403
404                 /* shutdown I/O -- not yet implemented */
405
406                 /* abandon active operations */
407                 for( o = c->c_ops; o != NULL; o = o->o_next ) {
408                         ldap_pvt_thread_mutex_lock( &o->o_abandonmutex );
409                         o->o_abandon = 1;
410                         ldap_pvt_thread_mutex_unlock( &o->o_abandonmutex );
411                 }
412
413                 /* remove pending operations */
414                 for( o = slap_op_pop( &c->c_pending_ops );
415                         o != NULL;
416                         o = slap_op_pop( &c->c_pending_ops ) )
417                 {
418                         slap_op_free( o );
419                 }
420
421                 /* wake write blocked operations */
422                 slapd_clr_write( ber_pvt_sb_get_desc(c->c_sb), 1 );
423                 ldap_pvt_thread_cond_signal( &c->c_write_cv );
424         }
425 }
426
427 static void connection_close( Connection *c )
428 {
429         assert( connections != NULL );
430         assert( c != NULL );
431         assert( c->c_struct_state == SLAP_C_USED );
432         assert( c->c_conn_state == SLAP_C_CLOSING );
433
434         /* note: connections_mutex should be locked by caller */
435
436         if( c->c_ops != NULL ) {
437                 Debug( LDAP_DEBUG_TRACE,
438                         "connection_close: deferring conn=%ld sd=%d.\n",
439                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
440
441                 return;
442         }
443
444         Debug( LDAP_DEBUG_TRACE, "connection_close: conn=%ld sd=%d.\n",
445                 c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
446
447         connection_destroy( c );
448 }
449
450 long connections_nextid(void)
451 {
452         long id;
453         assert( connections != NULL );
454
455         ldap_pvt_thread_mutex_lock( &connections_mutex );
456
457         id = conn_nextid;
458
459         ldap_pvt_thread_mutex_unlock( &connections_mutex );
460
461         return id;
462 }
463
464 Connection* connection_first(void)
465 {
466         assert( connections != NULL );
467
468         ldap_pvt_thread_mutex_lock( &connections_mutex );
469
470         assert( conn_index == -1 );
471         conn_index = 0;
472
473         return connection_next(NULL);
474 }
475
476 Connection* connection_next(Connection *c)
477 {
478         assert( connections != NULL );
479         assert( conn_index != -1 );
480         assert( conn_index <= dtblsize );
481
482         if( c != NULL ) {
483                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
484         }
485
486         c = NULL;
487
488         for(; conn_index < dtblsize; conn_index++) {
489                 if( connections[conn_index].c_struct_state == SLAP_C_UNINITIALIZED ) {
490                         assert( connections[conn_index].c_conn_state == SLAP_C_INVALID );
491 #ifndef HAVE_WINSOCK
492                         continue;
493 #else
494                         break;
495 #endif
496                 }
497
498                 if( connections[conn_index].c_struct_state == SLAP_C_USED ) {
499                         assert( connections[conn_index].c_conn_state != SLAP_C_INVALID );
500                         c = &connections[conn_index++];
501                         break;
502                 }
503
504                 assert( connections[conn_index].c_struct_state == SLAP_C_UNUSED );
505                 assert( connections[conn_index].c_conn_state == SLAP_C_INVALID );
506         }
507
508         if( c != NULL ) {
509                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
510         }
511
512         return c;
513 }
514
515 void connection_done(Connection *c)
516 {
517         assert( connections != NULL );
518         assert( conn_index != -1 );
519         assert( conn_index <= dtblsize );
520
521         if( c != NULL ) {
522                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
523         }
524
525         conn_index = -1;
526         ldap_pvt_thread_mutex_unlock( &connections_mutex );
527 }
528
529 /*
530  * connection_activity - handle the request operation op on connection
531  * conn.  This routine figures out what kind of operation it is and
532  * calls the appropriate stub to handle it.
533  */
534
535 static void *
536 connection_operation( void *arg_v )
537 {
538         struct co_arg   *arg = arg_v;
539         int tag = arg->co_op->o_tag;
540         Connection *conn = arg->co_conn;
541
542 #ifdef LDAP_COUNTERS
543         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
544         num_ops_initiated++;
545         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
546 #endif
547
548         switch ( tag ) {
549         case LDAP_REQ_BIND:
550                 do_bind( conn, arg->co_op );
551                 break;
552
553 #ifdef LDAP_COMPAT30
554         case LDAP_REQ_UNBIND_30:
555 #endif
556         case LDAP_REQ_UNBIND:
557                 do_unbind( conn, arg->co_op );
558                 break;
559
560         case LDAP_REQ_ADD:
561                 do_add( conn, arg->co_op );
562                 break;
563
564 #ifdef LDAP_COMPAT30
565         case LDAP_REQ_DELETE_30:
566 #endif
567         case LDAP_REQ_DELETE:
568                 do_delete( conn, arg->co_op );
569                 break;
570
571         case LDAP_REQ_MODRDN:
572                 do_modrdn( conn, arg->co_op );
573                 break;
574
575         case LDAP_REQ_MODIFY:
576                 do_modify( conn, arg->co_op );
577                 break;
578
579         case LDAP_REQ_COMPARE:
580                 do_compare( conn, arg->co_op );
581                 break;
582
583         case LDAP_REQ_SEARCH:
584                 do_search( conn, arg->co_op );
585                 break;
586
587 #ifdef LDAP_COMPAT30
588         case LDAP_REQ_ABANDON_30:
589 #endif
590         case LDAP_REQ_ABANDON:
591                 do_abandon( conn, arg->co_op );
592                 break;
593
594         default:
595                 Debug( LDAP_DEBUG_ANY, "unknown request 0x%lx\n",
596                     arg->co_op->o_tag, 0, 0 );
597                 break;
598         }
599
600 #ifdef LDAP_COUNTERS
601         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
602         num_ops_completed++;
603         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
604 #endif
605
606         ldap_pvt_thread_mutex_lock( &conn->c_mutex );
607
608 #ifdef LDAP_COUNTERS
609         conn->c_n_ops_completed++;
610 #endif
611
612         slap_op_remove( &conn->c_ops, arg->co_op );
613         slap_op_free( arg->co_op );
614         arg->co_op = NULL;
615         arg->co_conn = NULL;
616         free( (char *) arg );
617         arg = NULL;
618
619         switch( tag ) {
620 #ifdef LDAP_COMPAT30
621         case LDAP_REQ_UNBIND_30:
622 #endif
623         case LDAP_REQ_UNBIND:
624                 connection_closing( conn );
625                 break;
626
627         case LDAP_REQ_BIND:
628                 if( conn->c_conn_state == SLAP_C_BINDING) {
629                         conn->c_conn_state = SLAP_C_ACTIVE;
630                 }
631         }
632
633         if( conn->c_conn_state == SLAP_C_CLOSING ) {
634                 Debug( LDAP_DEBUG_TRACE,
635                         "connection_operation: attempting closing conn=%ld sd=%d.\n",
636                         conn->c_connid, ber_pvt_sb_get_desc( conn->c_sb ), 0 );
637
638                 connection_close( conn );
639         }
640
641         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
642         active_threads--;
643         if( active_threads < 1 ) {
644                 ldap_pvt_thread_cond_signal(&active_threads_cond);
645         }
646         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
647
648         connection_resched( conn );
649
650         ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
651
652         return NULL;
653 }
654
655 int connection_read(int s)
656 {
657         int rc = 0;
658         Connection *c;
659         assert( connections != NULL );
660
661         ldap_pvt_thread_mutex_lock( &connections_mutex );
662
663         c = connection_get( s );
664         if( c == NULL ) {
665                 Debug( LDAP_DEBUG_ANY,
666                         "connection_read(%d): no connection!\n",
667                         s, 0, 0 );
668                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
669                 return -1;
670         }
671
672         if( c->c_conn_state == SLAP_C_CLOSING ) {
673                 Debug( LDAP_DEBUG_TRACE,
674                         "connection_read(%d): closing, ignoring input for id=%ld\n",
675                         s, c->c_connid, 0 );
676
677                 connection_return( c );
678                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
679                 return 0;
680         }
681
682         Debug( LDAP_DEBUG_TRACE,
683                 "connection_read(%d): checking for input on id=%ld\n",
684                 s, c->c_connid, 0 );
685
686 #define CONNECTION_INPUT_LOOP 1
687
688 #ifdef DATA_READY_LOOP
689         while(!rc && ber_pvt_sb_data_ready(&c->c_sb))
690 #elif CONNECTION_INPUT_LOOP
691         while(!rc)
692 #endif
693         {
694                 rc = connection_input( c );
695         }
696
697         if( rc < 0 ) {
698                 Debug( LDAP_DEBUG_TRACE,
699                         "connection_read(%d): input error=%d id=%ld, closing.\n",
700                         s, rc, c->c_connid );
701
702                 connection_closing( c );
703                 connection_close( c );
704         }
705
706         connection_return( c );
707         ldap_pvt_thread_mutex_unlock( &connections_mutex );
708         return 0;
709 }
710
711 static int
712 connection_input(
713     Connection *conn
714 )
715 {
716         Operation *op;
717         unsigned long   tag, len;
718         long            msgid;
719         BerElement      *ber;
720
721         if ( conn->c_currentber == NULL && (conn->c_currentber = ber_alloc())
722             == NULL ) {
723                 Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n", 0, 0, 0 );
724                 return -1;
725         }
726
727         errno = 0;
728         if ( (tag = ber_get_next( conn->c_sb, &len, conn->c_currentber ))
729             != LDAP_TAG_MESSAGE )
730         {
731                 int err = errno;
732
733                 Debug( LDAP_DEBUG_TRACE,
734                         "ber_get_next on fd %d failed errno %d (%s)\n",
735                         ber_pvt_sb_get_desc( conn->c_sb ), err,
736                         err > -1 && err < sys_nerr ?  sys_errlist[err] : "unknown" );
737                 Debug( LDAP_DEBUG_TRACE,
738                         "\t*** got %ld of %lu so far\n",
739                         (long)(conn->c_currentber->ber_rwptr - conn->c_currentber->ber_buf),
740                         conn->c_currentber->ber_len, 0 );
741
742                 if ( err != EWOULDBLOCK && err != EAGAIN ) {
743                         /* log, close and send error */
744                         ber_free( conn->c_currentber, 1 );
745                         conn->c_currentber = NULL;
746
747                         return -2;
748                 }
749                 return 1;
750         }
751
752         ber = conn->c_currentber;
753         conn->c_currentber = NULL;
754
755         if ( (tag = ber_get_int( ber, &msgid )) != LDAP_TAG_MSGID ) {
756                 /* log, close and send error */
757                 Debug( LDAP_DEBUG_ANY, "ber_get_int returns 0x%lx\n", tag, 0,
758                     0 );
759                 ber_free( ber, 1 );
760                 return -1;
761         }
762
763         if ( (tag = ber_peek_tag( ber, &len )) == LBER_ERROR ) {
764                 /* log, close and send error */
765                 Debug( LDAP_DEBUG_ANY, "ber_peek_tag returns 0x%lx\n", tag, 0,
766                     0 );
767                 ber_free( ber, 1 );
768
769                 return -1;
770         }
771
772 #ifdef LDAP_COMPAT30
773         if ( conn->c_version == 30 ) {
774                 (void) ber_skip_tag( ber, &len );
775         }
776 #endif
777
778         op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++ );
779
780         if ( conn->c_conn_state == SLAP_C_BINDING
781                 || conn->c_conn_state == SLAP_C_CLOSING )
782         {
783                 Debug( LDAP_DEBUG_ANY, "deferring operation\n", 0, 0, 0 );
784                 slap_op_add( &conn->c_pending_ops, op );
785
786         } else {
787                 connection_op_activate( conn, op );
788         }
789
790 #ifdef NO_THREADS
791         if ( conn->c_struct_state != SLAP_C_USED ) {
792                 /* connection must have got closed underneath us */
793                 return 1;
794         }
795 #endif
796         assert( conn->c_struct_state == SLAP_C_USED );
797
798         return 0;
799 }
800
801 static int
802 connection_resched( Connection *conn )
803 {
804         Operation *op;
805
806         if( conn->c_conn_state != SLAP_C_ACTIVE ) {
807                 /* other states need different handling */
808                 return 0;
809         }
810
811         for( op = slap_op_pop( &conn->c_pending_ops );
812                 op != NULL;
813                 op = slap_op_pop( &conn->c_pending_ops ) )
814         {
815                 /* pending operations should not be marked for abandonment */
816                 assert(!op->o_abandon);
817
818                 connection_op_activate( conn, op );
819
820                 if ( conn->c_conn_state == SLAP_C_BINDING ) {
821                         break;
822                 }
823         }
824         return 0;
825 }
826
827 static int connection_op_activate( Connection *conn, Operation *op )
828 {
829         struct co_arg *arg;
830         char *tmpdn;
831         int status;
832         unsigned long tag = op->o_tag;
833
834         if ( conn->c_dn != NULL ) {
835                 tmpdn = ch_strdup( conn->c_dn );
836         } else {
837                 tmpdn = NULL;
838         }
839
840         arg = (struct co_arg *) ch_malloc( sizeof(struct co_arg) );
841         arg->co_conn = conn;
842         arg->co_op = op;
843
844         arg->co_op->o_dn = ch_strdup( tmpdn != NULL ? tmpdn : "" );
845         arg->co_op->o_ndn = dn_normalize_case( ch_strdup( arg->co_op->o_dn ) );
846
847         slap_op_add( &conn->c_ops, arg->co_op );
848
849         if(tag == LDAP_REQ_BIND) {
850                 conn->c_conn_state = SLAP_C_BINDING;
851         }
852
853         if ( tmpdn != NULL ) {
854                 free( tmpdn );
855         }
856
857         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
858         active_threads++;
859         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
860
861         status = ldap_pvt_thread_create( &arg->co_op->o_tid, 1,
862                                          connection_operation, (void *) arg );
863
864         if ( status != 0 ) {
865                 Debug( LDAP_DEBUG_ANY,
866                 "ldap_pvt_thread_create failed (%d)\n", status, 0, 0 );
867
868                 /* should move op to pending list */
869         }
870
871         return status;
872 }
873
874 int connection_write(int s)
875 {
876         Connection *c;
877         assert( connections != NULL );
878
879         ldap_pvt_thread_mutex_lock( &connections_mutex );
880
881         c = connection_get( s );
882         if( c == NULL ) {
883                 Debug( LDAP_DEBUG_ANY,
884                         "connection_write(%d): no connection!\n",
885                         s, 0, 0 );
886                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
887                 return -1;
888         }
889
890         Debug( LDAP_DEBUG_TRACE,
891                 "connection_write(%d): waking output for id=%ld\n",
892                 s, c->c_connid, 0 );
893
894         ldap_pvt_thread_cond_signal( &c->c_write_cv );
895
896         connection_return( c );
897         ldap_pvt_thread_mutex_unlock( &connections_mutex );
898         return 0;
899 }