]> git.sur5r.net Git - openldap/blob - servers/slapd/connection.c
2f6759d4815c635d269ace201f65d7a1991136d3
[openldap] / servers / slapd / connection.c
1 #include "portable.h"
2
3 #include <stdio.h>
4
5 #include <ac/socket.h>
6 #include <ac/errno.h>
7 #include <ac/signal.h>
8 #include <ac/string.h>
9 #include <ac/time.h>
10
11 #include "slap.h"
12
13 /* we need LBER internals */
14 #include "../../libraries/liblber/lber-int.h"
15
16 /* protected by connections_mutex */
17 static ldap_pvt_thread_mutex_t connections_mutex;
18 static Connection *connections = NULL;
19 static int conn_index = -1;
20 static long conn_nextid = 0;
21
22 /* structure state (protected by connections_mutex) */
23 #define SLAP_C_UNINITIALIZED    0x0     /* MUST BE ZERO (0) */
24 #define SLAP_C_UNUSED                   0x1
25 #define SLAP_C_USED                             0x2
26
27 /* connection state (protected by c_mutex ) */
28 #define SLAP_C_INVALID                  0x0     /* MUST BE ZERO (0) */
29 #define SLAP_C_INACTIVE                 0x1     /* zero threads */
30 #define SLAP_C_ACTIVE                   0x2 /* one or more threads */
31 #define SLAP_C_BINDING                  0x3     /* binding */
32 #define SLAP_C_CLOSING                  0x4     /* closing */
33
34 void slapd_remove(int s);
35 static Connection* connection_get( int s );
36
37 static int connection_input( Connection *c );
38 static void connection_close( Connection *c );
39
40 static int connection_op_activate( Connection *conn, Operation *op );
41 static int connection_resched( Connection *conn );
42
43 struct co_arg {
44         Connection      *co_conn;
45         Operation       *co_op;
46 };
47
48 /*
49  * Initialize connection management infrastructure.
50  */
51 int connections_init(void)
52 {
53         int i;
54
55         assert( connections == NULL );
56
57         if( connections != NULL) {
58                 Debug( LDAP_DEBUG_ANY, "connections_init: already initialized.\n",
59                         0, 0, 0 );
60                 return -1;
61         }
62
63         /* should check return of every call */
64         ldap_pvt_thread_mutex_init( &connections_mutex );
65
66         connections = (Connection *) calloc( dtblsize, sizeof(Connection) );
67
68         if( connections == NULL ) {
69                 Debug( LDAP_DEBUG_ANY,
70                         "connections_init: allocation (%d*%ld) of connection array failed.\n",
71                         dtblsize, (long) sizeof(Connection), 0 );
72                 return -1;
73         }
74
75         for ( i = 0; i < dtblsize; i++ )
76                 memset( &connections[i], 0, sizeof(Connection) );
77
78         /*
79          * per entry initialization of the Connection array initialization
80          * will be done by connection_init()
81          */ 
82
83         return 0;
84 }
85
86 /*
87  * Destroy connection management infrastructure.
88  */
89 int connections_destroy(void)
90 {
91         int i;
92
93         /* should check return of every call */
94
95         if( connections == NULL) {
96                 Debug( LDAP_DEBUG_ANY, "connections_destroy: nothing to destroy.\n",
97                         0, 0, 0 );
98                 return -1;
99         }
100
101         for ( i = 0; i < dtblsize; i++ ) {
102                 if( connections[i].c_struct_state != SLAP_C_UNINITIALIZED ) {
103                         ldap_pvt_thread_mutex_destroy( &connections[i].c_mutex );
104                         ldap_pvt_thread_mutex_destroy( &connections[i].c_write_mutex );
105                         ldap_pvt_thread_cond_destroy( &connections[i].c_write_cv );
106                 }
107         }
108
109         free( connections );
110         connections = NULL;
111
112         ldap_pvt_thread_mutex_destroy( &connections_mutex );
113         return 0;
114 }
115
116 /*
117  * shutdown all connections
118  */
119 int connections_shutdown(void)
120 {
121         int i;
122
123         ldap_pvt_thread_mutex_lock( &connections_mutex );
124
125         for ( i = 0; i < dtblsize; i++ ) {
126                 if( connections[i].c_struct_state != SLAP_C_USED ) {
127                         continue;
128                 }
129
130                 ldap_pvt_thread_mutex_lock( &connections[i].c_mutex );
131                 connection_closing( &connections[i] );
132                 connection_close( &connections[i] );
133                 ldap_pvt_thread_mutex_unlock( &connections[i].c_mutex );
134         }
135
136         ldap_pvt_thread_mutex_unlock( &connections_mutex );
137
138         return 0;
139 }
140
141 static Connection* connection_get( int s )
142 {
143         Connection *c = NULL;
144
145         assert( connections != NULL );
146
147         if(s < 0) {
148                 return NULL;
149         }
150
151 #ifndef HAVE_WINSOCK
152         assert( connections[s].c_struct_state == SLAP_C_USED );
153         assert( connections[s].c_conn_state != SLAP_C_INVALID );
154         assert( ber_pvt_sb_in_use( connections[s].c_sb ) );
155
156         c = &connections[s];
157 #else
158         {
159                 int i;
160
161                 for(i=0; i<dtblsize; i++) {
162                         if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
163                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
164                                 assert( connections[i].c_sb == 0 );
165                                 break;
166                         }
167
168                         if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
169                                 assert( connections[i].c_conn_state == SLAP_C_INVALID );
170                                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ) );
171                                 continue;
172                         }
173
174                         assert( connections[i].c_struct_state == SLAP_C_USED );
175                         assert( connections[i].c_conn_state != SLAP_C_INVALID );
176                         assert( ber_pvt_sb_in_use( connections[i].c_sb ) );
177
178                         if( ber_pvt_sb_get_desc( connections[i].c_sb ) == s ) {
179                                 c = &connections[i];
180                                 break;
181                         }
182                 }
183         }
184 #endif
185
186         if( c != NULL ) {
187                 /* we do this BEFORE locking to aid in debugging */
188                 Debug( LDAP_DEBUG_TRACE,
189                         "connection_get(%d): got connid=%ld\n",
190                         s, c->c_connid, 0 );
191
192                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
193         }
194         return c;
195 }
196
197 static void connection_return( Connection *c )
198 {
199         ldap_pvt_thread_mutex_unlock( &c->c_mutex );
200 }
201
202 long connection_init(
203         int s,
204         const char* name,
205         const char* addr)
206 {
207         long id;
208         Connection *c;
209         assert( connections != NULL );
210
211         if( s < 0 ) {
212                 return -1;
213         }
214
215         assert( s >= 0 );
216 #ifndef HAVE_WINSOCK
217         assert( s < dtblsize );
218 #endif
219
220         ldap_pvt_thread_mutex_lock( &connections_mutex );
221
222 #ifndef HAVE_WINSOCK
223         c = &connections[s];
224
225 #else
226         {
227                 int i;
228
229         for( i=0; i < dtblsize; i++) {
230             if( connections[i].c_struct_state == SLAP_C_UNINITIALIZED ) {
231                 assert( connections[i].c_sb == 0 );
232                 c = &connections[i];
233                 break;
234             }
235
236             if( connections[i].c_struct_state == SLAP_C_UNUSED ) {
237                 assert( !ber_pvt_sb_in_use( connections[i].c_sb ));
238                 c = &connections[i];
239                 break;
240             }
241
242             assert( connections[i].c_struct_state == SLAP_C_USED );
243             assert( connections[i].c_conn_state != SLAP_C_INVALID );
244             assert( ber_pvt_sb_in_use( connections[i].c_sb ));
245         }
246
247         if( c == NULL ) {
248             ldap_pvt_thread_mutex_unlock( &connections_mutex );
249             return -1;
250         }
251     }
252 #endif
253
254     assert( c != NULL );
255     assert( c->c_struct_state != SLAP_C_USED );
256     assert( c->c_conn_state == SLAP_C_INVALID );
257
258     if( c->c_struct_state == SLAP_C_UNINITIALIZED ) {
259         c->c_dn = NULL;
260         c->c_cdn = NULL;
261         c->c_client_name = NULL;
262         c->c_client_addr = NULL;
263         c->c_ops = NULL;
264         c->c_pending_ops = NULL;
265
266         c->c_sb = ber_sockbuf_alloc( );
267
268         /* should check status of thread calls */
269         ldap_pvt_thread_mutex_init( &c->c_mutex );
270         ldap_pvt_thread_mutex_init( &c->c_write_mutex );
271         ldap_pvt_thread_cond_init( &c->c_write_cv );
272
273         c->c_struct_state = SLAP_C_UNUSED;
274     }
275
276     ldap_pvt_thread_mutex_lock( &c->c_mutex );
277
278     assert( c->c_struct_state == SLAP_C_UNUSED );
279     assert(     c->c_dn == NULL );
280     assert(     c->c_cdn == NULL );
281     assert( c->c_client_name == NULL );
282     assert( c->c_client_addr == NULL );
283     assert( c->c_ops == NULL );
284     assert( c->c_pending_ops == NULL );
285
286     c->c_client_name = ch_strdup( name == NULL ? "" : name );
287     c->c_client_addr = ch_strdup( addr );
288
289     c->c_n_ops_received = 0;
290 #ifdef LDAP_COUNTERS
291     c->c_n_ops_executing = 0;
292     c->c_n_ops_pending = 0;
293     c->c_n_ops_completed = 0;
294 #endif
295
296     c->c_starttime = slap_get_time();
297
298     ber_pvt_sb_set_desc( c->c_sb, s );
299     ber_pvt_sb_set_io( c->c_sb, &ber_pvt_sb_io_tcp, NULL );
300
301     if( ber_pvt_sb_set_nonblock( c->c_sb, 1 ) < 0 ) {
302         Debug( LDAP_DEBUG_ANY,
303             "connection_init(%d, %s, %s): set nonblocking failed\n",
304             s, c->c_client_name, c->c_client_addr);
305     }
306
307     id = c->c_connid = conn_nextid++;
308
309     c->c_conn_state = SLAP_C_INACTIVE;
310     c->c_struct_state = SLAP_C_USED;
311
312     ldap_pvt_thread_mutex_unlock( &c->c_mutex );
313     ldap_pvt_thread_mutex_unlock( &connections_mutex );
314
315     return id;
316 }
317
318 static void
319 connection_destroy( Connection *c )
320 {
321     assert( connections != NULL );
322     assert( c != NULL );
323     assert( c->c_struct_state != SLAP_C_UNUSED );
324     assert( c->c_conn_state != SLAP_C_INVALID );
325     assert( c->c_ops == NULL );
326
327         ldap_pvt_thread_mutex_lock( &connections_mutex );
328     c->c_struct_state = SLAP_C_UNUSED;
329     c->c_conn_state = SLAP_C_INVALID;
330
331     c->c_version = 0;
332     c->c_protocol = 0;
333
334     c->c_starttime = 0;
335
336     if(c->c_dn != NULL) {
337         free(c->c_dn);
338         c->c_dn = NULL;
339     }
340         if(c->c_cdn != NULL) {
341                 free(c->c_cdn);
342                 c->c_cdn = NULL;
343         }
344         if(c->c_client_name != NULL) {
345                 free(c->c_client_name);
346                 c->c_client_name = NULL;
347         }
348         if(c->c_client_addr != NULL) {
349                 free(c->c_client_addr);
350                 c->c_client_addr = NULL;
351         }
352
353         if ( ber_pvt_sb_in_use(c->c_sb) ) {
354                 int sd = ber_pvt_sb_get_desc(c->c_sb);
355
356                 slapd_remove( sd );
357                 ber_pvt_sb_close( c->c_sb );
358
359                 Statslog( LDAP_DEBUG_STATS,
360                     "conn=%d fd=%d closed.\n",
361                         c->c_connid, sd, 0, 0, 0 );
362         }
363
364         ber_pvt_sb_destroy( c->c_sb );
365         ldap_pvt_thread_mutex_unlock( &connections_mutex );
366 }
367
368 int connection_state_closing( Connection *c )
369 {
370         /* connection must be locked by caller */
371         int state;
372         assert( c != NULL );
373         assert( c->c_struct_state == SLAP_C_USED );
374
375         state = c->c_conn_state;
376
377         assert( state != SLAP_C_INVALID );
378
379         return state == SLAP_C_CLOSING;
380 }
381
382 void connection_closing( Connection *c )
383 {
384         assert( connections != NULL );
385         assert( c != NULL );
386         assert( c->c_struct_state == SLAP_C_USED );
387         assert( c->c_conn_state != SLAP_C_INVALID );
388
389         if( c->c_conn_state != SLAP_C_CLOSING ) {
390                 Operation *o;
391
392                 Debug( LDAP_DEBUG_TRACE,
393                         "connection_closing: readying conn=%ld sd=%d for close.\n",
394                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
395
396                 /* update state to closing */
397                 c->c_conn_state = SLAP_C_CLOSING;
398
399                 /* don't listen on this port anymore */
400                 slapd_clr_read( ber_pvt_sb_get_desc( c->c_sb ), 1 );
401
402                 /* shutdown I/O -- not yet implemented */
403
404                 /* abandon active operations */
405                 for( o = c->c_ops; o != NULL; o = o->o_next ) {
406                         ldap_pvt_thread_mutex_lock( &o->o_abandonmutex );
407                         o->o_abandon = 1;
408                         ldap_pvt_thread_mutex_unlock( &o->o_abandonmutex );
409                 }
410
411                 /* remove pending operations */
412                 for( o = slap_op_pop( &c->c_pending_ops );
413                         o != NULL;
414                         o = slap_op_pop( &c->c_pending_ops ) )
415                 {
416                         slap_op_free( o );
417                 }
418
419                 /* wake write blocked operations */
420                 slapd_clr_write( ber_pvt_sb_get_desc(c->c_sb), 1 );
421                 ldap_pvt_thread_cond_signal( &c->c_write_cv );
422         }
423 }
424
425 static void connection_close( Connection *c )
426 {
427         assert( connections != NULL );
428         assert( c != NULL );
429         assert( c->c_struct_state == SLAP_C_USED );
430         assert( c->c_conn_state == SLAP_C_CLOSING );
431
432         if( c->c_ops != NULL ) {
433                 Debug( LDAP_DEBUG_TRACE,
434                         "connection_close: deferring conn=%ld sd=%d.\n",
435                         c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
436
437                 return;
438         }
439
440         Debug( LDAP_DEBUG_TRACE, "connection_close: conn=%ld sd=%d.\n",
441                 c->c_connid, ber_pvt_sb_get_desc( c->c_sb ), 0 );
442
443         connection_destroy( c );
444 }
445
446 long connections_nextid(void)
447 {
448         long id;
449         assert( connections != NULL );
450
451         ldap_pvt_thread_mutex_lock( &connections_mutex );
452
453         id = conn_nextid;
454
455         ldap_pvt_thread_mutex_unlock( &connections_mutex );
456
457         return id;
458 }
459
460 Connection* connection_first(void)
461 {
462         assert( connections != NULL );
463
464         ldap_pvt_thread_mutex_lock( &connections_mutex );
465
466         assert( conn_index == -1 );
467         conn_index = 0;
468
469         return connection_next(NULL);
470 }
471
472 Connection* connection_next(Connection *c)
473 {
474         assert( connections != NULL );
475         assert( conn_index != -1 );
476         assert( conn_index <= dtblsize );
477
478         if( c != NULL ) {
479                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
480         }
481
482         c = NULL;
483
484         for(; conn_index < dtblsize; conn_index++) {
485                 if( connections[conn_index].c_struct_state == SLAP_C_UNINITIALIZED ) {
486                         assert( connections[conn_index].c_conn_state == SLAP_C_INVALID );
487 #ifndef HAVE_WINSOCK
488                         continue;
489 #else
490                         break;
491 #endif
492                 }
493
494                 if( connections[conn_index].c_struct_state == SLAP_C_USED ) {
495                         assert( connections[conn_index].c_conn_state != SLAP_C_INVALID );
496                         c = &connections[conn_index++];
497                         break;
498                 }
499
500                 assert( connections[conn_index].c_struct_state == SLAP_C_UNUSED );
501                 assert( connections[conn_index].c_conn_state == SLAP_C_INVALID );
502         }
503
504         if( c != NULL ) {
505                 ldap_pvt_thread_mutex_lock( &c->c_mutex );
506         }
507
508         return c;
509 }
510
511 void connection_done(Connection *c)
512 {
513         assert( connections != NULL );
514         assert( conn_index != -1 );
515         assert( conn_index <= dtblsize );
516
517         if( c != NULL ) {
518                 ldap_pvt_thread_mutex_unlock( &c->c_mutex );
519         }
520
521         conn_index = -1;
522         ldap_pvt_thread_mutex_unlock( &connections_mutex );
523 }
524
525 /*
526  * connection_activity - handle the request operation op on connection
527  * conn.  This routine figures out what kind of operation it is and
528  * calls the appropriate stub to handle it.
529  */
530
531 static void *
532 connection_operation( void *arg_v )
533 {
534         struct co_arg   *arg = arg_v;
535         int tag = arg->co_op->o_tag;
536         Connection *conn = arg->co_conn;
537
538 #ifdef LDAP_COUNTERS
539         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
540         num_ops_initiated++;
541         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
542 #endif
543
544         switch ( tag ) {
545         case LDAP_REQ_BIND:
546                 do_bind( conn, arg->co_op );
547                 break;
548
549 #ifdef LDAP_COMPAT30
550         case LDAP_REQ_UNBIND_30:
551 #endif
552         case LDAP_REQ_UNBIND:
553                 do_unbind( conn, arg->co_op );
554                 break;
555
556         case LDAP_REQ_ADD:
557                 do_add( conn, arg->co_op );
558                 break;
559
560 #ifdef LDAP_COMPAT30
561         case LDAP_REQ_DELETE_30:
562 #endif
563         case LDAP_REQ_DELETE:
564                 do_delete( conn, arg->co_op );
565                 break;
566
567         case LDAP_REQ_MODRDN:
568                 do_modrdn( conn, arg->co_op );
569                 break;
570
571         case LDAP_REQ_MODIFY:
572                 do_modify( conn, arg->co_op );
573                 break;
574
575         case LDAP_REQ_COMPARE:
576                 do_compare( conn, arg->co_op );
577                 break;
578
579         case LDAP_REQ_SEARCH:
580                 do_search( conn, arg->co_op );
581                 break;
582
583 #ifdef LDAP_COMPAT30
584         case LDAP_REQ_ABANDON_30:
585 #endif
586         case LDAP_REQ_ABANDON:
587                 do_abandon( conn, arg->co_op );
588                 break;
589
590         default:
591                 Debug( LDAP_DEBUG_ANY, "unknown request 0x%lx\n",
592                     arg->co_op->o_tag, 0, 0 );
593                 break;
594         }
595
596 #ifdef LDAP_COUNTERS
597         ldap_pvt_thread_mutex_lock( &num_ops_mutex );
598         num_ops_completed++;
599         ldap_pvt_thread_mutex_unlock( &num_ops_mutex );
600 #endif
601
602         ldap_pvt_thread_mutex_lock( &conn->c_mutex );
603
604 #ifdef LDAP_COUNTERS
605         conn->c_n_ops_completed++;
606 #endif
607
608         slap_op_remove( &conn->c_ops, arg->co_op );
609         slap_op_free( arg->co_op );
610         arg->co_op = NULL;
611         arg->co_conn = NULL;
612         free( (char *) arg );
613         arg = NULL;
614
615         switch( tag ) {
616 #ifdef LDAP_COMPAT30
617         case LDAP_REQ_UNBIND_30:
618 #endif
619         case LDAP_REQ_UNBIND:
620                 connection_closing( conn );
621                 break;
622
623         case LDAP_REQ_BIND:
624                 if( conn->c_conn_state == SLAP_C_BINDING) {
625                         conn->c_conn_state = SLAP_C_ACTIVE;
626                 }
627         }
628
629         if( conn->c_conn_state == SLAP_C_CLOSING ) {
630                 Debug( LDAP_DEBUG_TRACE,
631                         "connection_operation: attempting closing conn=%ld sd=%d.\n",
632                         conn->c_connid, ber_pvt_sb_get_desc( conn->c_sb ), 0 );
633
634                 connection_close( conn );
635         }
636
637         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
638         active_threads--;
639         if( active_threads < 1 ) {
640                 ldap_pvt_thread_cond_signal(&active_threads_cond);
641         }
642         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
643
644         connection_resched( conn );
645
646         ldap_pvt_thread_mutex_unlock( &conn->c_mutex );
647
648         return NULL;
649 }
650
651 int connection_read(int s)
652 {
653         int rc = 0;
654         Connection *c;
655         assert( connections != NULL );
656
657         ldap_pvt_thread_mutex_lock( &connections_mutex );
658
659         c = connection_get( s );
660         if( c == NULL ) {
661                 Debug( LDAP_DEBUG_ANY,
662                         "connection_read(%d): no connection!\n",
663                         s, 0, 0 );
664                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
665                 return -1;
666         }
667
668         if( c->c_conn_state == SLAP_C_CLOSING ) {
669                 Debug( LDAP_DEBUG_TRACE,
670                         "connection_read(%d): closing, ignoring input for id=%ld\n",
671                         s, c->c_connid, 0 );
672
673                 connection_return( c );
674                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
675                 return 0;
676         }
677
678         Debug( LDAP_DEBUG_TRACE,
679                 "connection_read(%d): checking for input on id=%ld\n",
680                 s, c->c_connid, 0 );
681
682 #define CONNECTION_INPUT_LOOP 1
683
684 #ifdef DATA_READY_LOOP
685         while(!rc && ber_pvt_sb_data_ready(&c->c_sb))
686 #elif CONNECTION_INPUT_LOOP
687         while(!rc)
688 #endif
689         {
690                 rc = connection_input( c );
691         }
692
693         if( rc < 0 ) {
694                 Debug( LDAP_DEBUG_TRACE,
695                         "connection_read(%d): input error=%d id=%ld, closing.\n",
696                         s, rc, c->c_connid );
697
698                 connection_closing( c );
699                 connection_close( c );
700         }
701
702         connection_return( c );
703         ldap_pvt_thread_mutex_unlock( &connections_mutex );
704         return 0;
705 }
706
707 static int
708 connection_input(
709     Connection *conn
710 )
711 {
712         Operation *op;
713         unsigned long   tag, len;
714         long            msgid;
715         BerElement      *ber;
716
717         if ( conn->c_currentber == NULL && (conn->c_currentber = ber_alloc())
718             == NULL ) {
719                 Debug( LDAP_DEBUG_ANY, "ber_alloc failed\n", 0, 0, 0 );
720                 return -1;
721         }
722
723         errno = 0;
724         if ( (tag = ber_get_next( conn->c_sb, &len, conn->c_currentber ))
725             != LDAP_TAG_MESSAGE )
726         {
727                 int err = errno;
728
729                 Debug( LDAP_DEBUG_TRACE,
730                         "ber_get_next on fd %d failed errno %d (%s)\n",
731                         ber_pvt_sb_get_desc( conn->c_sb ), err,
732                         err > -1 && err < sys_nerr ?  sys_errlist[err] : "unknown" );
733                 Debug( LDAP_DEBUG_TRACE,
734                         "\t*** got %ld of %lu so far\n",
735                         (long)(conn->c_currentber->ber_rwptr - conn->c_currentber->ber_buf),
736                         conn->c_currentber->ber_len, 0 );
737
738                 if ( err != EWOULDBLOCK && err != EAGAIN ) {
739                         /* log, close and send error */
740                         ber_free( conn->c_currentber, 1 );
741                         conn->c_currentber = NULL;
742
743                         return -2;
744                 }
745                 return 1;
746         }
747
748         ber = conn->c_currentber;
749         conn->c_currentber = NULL;
750
751         if ( (tag = ber_get_int( ber, &msgid )) != LDAP_TAG_MSGID ) {
752                 /* log, close and send error */
753                 Debug( LDAP_DEBUG_ANY, "ber_get_int returns 0x%lx\n", tag, 0,
754                     0 );
755                 ber_free( ber, 1 );
756                 return -1;
757         }
758
759         if ( (tag = ber_peek_tag( ber, &len )) == LBER_ERROR ) {
760                 /* log, close and send error */
761                 Debug( LDAP_DEBUG_ANY, "ber_peek_tag returns 0x%lx\n", tag, 0,
762                     0 );
763                 ber_free( ber, 1 );
764
765                 return -1;
766         }
767
768 #ifdef LDAP_COMPAT30
769         if ( conn->c_version == 30 ) {
770                 (void) ber_skip_tag( ber, &len );
771         }
772 #endif
773
774         op = slap_op_alloc( ber, msgid, tag, conn->c_n_ops_received++ );
775
776         if ( conn->c_conn_state == SLAP_C_BINDING
777                 || conn->c_conn_state == SLAP_C_CLOSING )
778         {
779                 Debug( LDAP_DEBUG_ANY, "deferring operation\n", 0, 0, 0 );
780                 slap_op_add( &conn->c_pending_ops, op );
781
782         } else {
783                 connection_op_activate( conn, op );
784         }
785
786 #ifdef NO_THREADS
787         if ( conn->c_struct_state != SLAP_C_USED ) {
788                 /* connection must have got closed underneath us */
789                 return 1;
790         }
791 #endif
792         assert( conn->c_struct_state == SLAP_C_USED );
793
794         return 0;
795 }
796
797 static int
798 connection_resched( Connection *conn )
799 {
800         Operation *op;
801
802         if( conn->c_conn_state != SLAP_C_ACTIVE ) {
803                 /* other states need different handling */
804                 return 0;
805         }
806
807         for( op = slap_op_pop( &conn->c_pending_ops );
808                 op != NULL;
809                 op = slap_op_pop( &conn->c_pending_ops ) )
810         {
811                 /* pending operations should not be marked for abandonment */
812                 assert(!op->o_abandon);
813
814                 connection_op_activate( conn, op );
815
816                 if ( conn->c_conn_state == SLAP_C_BINDING ) {
817                         break;
818                 }
819         }
820         return 0;
821 }
822
823 static int connection_op_activate( Connection *conn, Operation *op )
824 {
825         struct co_arg *arg;
826         char *tmpdn;
827         int status;
828         unsigned long tag = op->o_tag;
829
830         if ( conn->c_dn != NULL ) {
831                 tmpdn = ch_strdup( conn->c_dn );
832         } else {
833                 tmpdn = NULL;
834         }
835
836         arg = (struct co_arg *) ch_malloc( sizeof(struct co_arg) );
837         arg->co_conn = conn;
838         arg->co_op = op;
839
840         arg->co_op->o_dn = ch_strdup( tmpdn != NULL ? tmpdn : "" );
841         arg->co_op->o_ndn = dn_normalize_case( ch_strdup( arg->co_op->o_dn ) );
842
843         slap_op_add( &conn->c_ops, arg->co_op );
844
845         if(tag == LDAP_REQ_BIND) {
846                 conn->c_conn_state = SLAP_C_BINDING;
847         }
848
849         if ( tmpdn != NULL ) {
850                 free( tmpdn );
851         }
852
853         ldap_pvt_thread_mutex_lock( &active_threads_mutex );
854         active_threads++;
855         ldap_pvt_thread_mutex_unlock( &active_threads_mutex );
856
857         status = ldap_pvt_thread_create( &arg->co_op->o_tid, 1,
858                                          connection_operation, (void *) arg );
859
860         if ( status != 0 ) {
861                 Debug( LDAP_DEBUG_ANY,
862                 "ldap_pvt_thread_create failed (%d)\n", status, 0, 0 );
863
864                 /* should move op to pending list */
865         }
866
867         return status;
868 }
869
870 int connection_write(int s)
871 {
872         Connection *c;
873         assert( connections != NULL );
874
875         ldap_pvt_thread_mutex_lock( &connections_mutex );
876
877         c = connection_get( s );
878         if( c == NULL ) {
879                 Debug( LDAP_DEBUG_ANY,
880                         "connection_write(%d): no connection!\n",
881                         s, 0, 0 );
882                 ldap_pvt_thread_mutex_unlock( &connections_mutex );
883                 return -1;
884         }
885
886         Debug( LDAP_DEBUG_TRACE,
887                 "connection_write(%d): waking output for id=%ld\n",
888                 s, c->c_connid, 0 );
889
890         ldap_pvt_thread_cond_signal( &c->c_write_cv );
891
892         connection_return( c );
893         ldap_pvt_thread_mutex_unlock( &connections_mutex );
894         return 0;
895 }