- added Sean's opendht entry
[codemux.git] / codemux.c
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <sys/stat.h>
4 #include <sys/wait.h>
5 #include <netinet/in.h>
6 #include <arpa/inet.h>
7 #include <ctype.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <signal.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <time.h>
14 #include <unistd.h>
15 #include <string.h>
16 #include "codemuxlib.h"
17
18 #define DEBUG 0
19
20 #ifdef DEBUG
21 #define TRACE(fmt, msg...) fprintf(stderr, "[%s,%d] " fmt, __FUNCTION__, __LINE__, ##msg)
22 #else
23 #define TRACE(fmt, msg...) (void)0
24 #endif
25
26 #define CONF_FILE "/etc/codemux/codemux.conf"
27 #define DEMUX_PORT 80
28 #define PIDFILE "/var/run/codemux.pid"
29 #define TARG_SETSIZE 4096
30
31 /* set aside some small number of fds for us, allow the rest for
32    connections */
33 #define MAX_CONNS ((TARG_SETSIZE-20)/2)
34
35 /* no single service can take more than half the connections */
36 #define SERVICE_MAX (MAX_CONNS/2)
37
38 /* how many total connections before we get concerned about fairness
39    among them */
40 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
41
42 /* codemux version */
43 #define CODEMUX_VERSION "0.3"
44
45 typedef struct FlowBuf {
46   int fb_refs;                  /* num refs */
47   char *fb_buf;                 /* actual buffer */
48   int fb_used;                  /* bytes used in buffer */
49 } FlowBuf;
50 #define FB_SIZE 3800            /* max usable size */
51 #define FB_ALLOCSIZE 4000       /* extra to include IP address */
52
53 typedef struct SockInfo {
54   int si_peerFd;                /* fd of peer */
55   struct in_addr si_cliAddr;    /* address of client */
56   int si_blocked;               /* are we blocked? */
57   int si_needsHeaderSince;      /* since when are we waiting for a header */
58   int si_whichService;          /* index of service */
59   FlowBuf *si_readBuf;          /* read data into this buffer */
60   FlowBuf *si_writeBuf;         /* drain this buffer for writing */
61 } SockInfo;
62
63 static SockInfo sockInfo[TARG_SETSIZE]; /* fd number of peer socket */
64
65 typedef struct ServiceSig {
66   char *ss_host;                /* suffix in host */
67   char *ss_slice;
68   short ss_port;
69   int ss_slicePos;              /* position in slices array */
70 } ServiceSig;
71
72 static ServiceSig *serviceSig;
73 static int numServices;
74 static int confFileReadTime;
75 static int now;
76
77 typedef struct SliceInfo {
78   char *si_sliceName;
79   int si_inUse;                 /* do any services refer to this? */
80   int si_numConns;
81   int si_xid;
82 } SliceInfo;
83
84 static SliceInfo *slices;
85 static int numSlices;
86 static int numActiveSlices;
87 static int numTotalSliceConns;
88 static int anySliceXidsNeeded;
89
90 typedef struct OurFDSet {
91   long __fds_bits[TARG_SETSIZE/32];
92 } OurFDSet;
93 static OurFDSet masterReadSet, masterWriteSet;
94 static int highestSetFd;
95 static int numNeedingHeaders;   /* how many conns waiting on headers? */
96
97 static int numForks;
98
99 #ifndef SO_SETXID
100 #define SO_SETXID SO_PEERCRED
101 #endif
102 /*-----------------------------------------------------------------*/
103 static SliceInfo *
104 ServiceToSlice(int whichService)
105 {
106   if (whichService < 0)
107     return(NULL);
108   return(&slices[serviceSig[whichService].ss_slicePos]);
109 }
110 /*-----------------------------------------------------------------*/
111 static void
112 DumpStatus(int fd)
113 {
114   char buf[65535];
115   char *start = buf;
116   int i;
117   int len;
118
119   sprintf(start, 
120           "CoDemux version %s\n"
121           "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
122           "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
123           CODEMUX_VERSION,
124           numForks, numActiveSlices, numTotalSliceConns,
125           numNeedingHeaders, anySliceXidsNeeded);
126   start += strlen(start);
127
128   for (i = 0; i < numSlices; i++) {
129     SliceInfo *si = &slices[i];
130     sprintf(start, "Slice %d: %s xid %d, %d conns, inUse %d\n", 
131             i, si->si_sliceName, si->si_xid, si->si_numConns,
132             si->si_inUse);
133     start += strlen(start);
134   }
135
136   for (i = 0; i < numServices; i++) {
137     ServiceSig *ss = &serviceSig[i];
138     sprintf(start, "Service %d: %s %s port %d, slice# %d\n", i, ss->ss_host,
139             ss->ss_slice, (int) ss->ss_port, ss->ss_slicePos);
140     start += strlen(start);
141   }
142
143   len = start - buf;
144   write(fd, buf, len);
145 }
146 /*-----------------------------------------------------------------*/
147 static void
148 GetSliceXids(void)
149 {
150   /* walks through /etc/passwd, and gets the uid for every slice we
151      have */
152   FILE *f;
153   char *line;
154   int i;
155
156   if (!anySliceXidsNeeded)
157     return;
158
159   for (i = 0; i < numSlices; i++) {
160     SliceInfo *si = &slices[i];
161     si->si_inUse = 0;
162   }
163   for (i = 0; i < numServices; i++) {
164     SliceInfo *si = ServiceToSlice(i);
165     if (si != NULL)
166       si->si_inUse++;
167   }  
168
169   if ((f = fopen("/etc/passwd", "r")) == NULL)
170     return;
171
172   while ((line = GetNextLine(f)) != NULL) {
173     char *temp;
174     int xid;
175
176     if ((temp = strchr(line, ':')) == NULL)
177       continue;                 /* weird line */
178     *temp = '\0';               /* terminate slice name */
179     temp++;
180     if ((temp = strchr(temp+1, ':')) == NULL)
181       continue;                 /* weird line */
182     if ((xid = atoi(temp+1)) < 1)
183       continue;                 /* weird xid */
184
185     /* we've got a slice name and xid, let's try to match */
186     for (i = 0; i < numSlices; i++) {
187       if (slices[i].si_xid == 0 &&
188           strcasecmp(slices[i].si_sliceName, line) == 0) {
189         slices[i].si_xid = xid;
190         break;
191       }
192     }
193   }
194
195   /* assume service 0 is the root service, and don't check it since
196      it'll have xid zero */
197   anySliceXidsNeeded = FALSE;
198   for (i = 1; i < numSlices; i++) {
199     if (slices[i].si_xid == 0 && slices[i].si_inUse > 0) {
200       anySliceXidsNeeded = TRUE;
201       break;
202     }
203   }
204
205   fclose(f);
206 }
207 /*-----------------------------------------------------------------*/
208 static void
209 SliceConnsInc(int whichService)
210 {
211   SliceInfo *si = ServiceToSlice(whichService);
212
213   if (si == NULL)
214     return;
215   numTotalSliceConns++;
216   si->si_numConns++;
217   if (si->si_numConns == 1)
218     numActiveSlices++;
219 }
220 /*-----------------------------------------------------------------*/
221 static void
222 SliceConnsDec(int whichService)
223 {
224   SliceInfo *si = ServiceToSlice(whichService);
225
226   if (si == NULL)
227     return;
228   numTotalSliceConns--;
229   si->si_numConns--;
230   if (si->si_numConns == 0)
231     numActiveSlices--;
232 }
233 /*-----------------------------------------------------------------*/
234 static int
235 WhichSlicePos(char *slice)
236 {
237   /* adds the new slice if necessary, returns the index into slice
238      array. Never change the ordering of existing slices */
239   int i;
240   static int numSlicesAlloc;
241
242   for (i = 0; i < numSlices; i++) {
243     if (strcasecmp(slice, slices[i].si_sliceName) == 0)
244       return(i);
245   }
246
247   if (numSlices >= numSlicesAlloc) {
248     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
249     slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo));
250   }
251
252   memset(&slices[numSlices], 0, sizeof(SliceInfo));
253   slices[numSlices].si_sliceName = strdup(slice);
254   numSlices++;
255   return(numSlices-1);
256 }
257 /*-----------------------------------------------------------------*/
258 static void
259 ReadConfFile(void)
260 {
261   int numAlloc = 0;
262   int num = 0;
263   ServiceSig *servs = NULL;
264   FILE *f;
265   char *line = NULL;
266   struct stat statBuf;
267   int i;
268
269   if (stat(CONF_FILE, &statBuf) != 0) {
270     fprintf(stderr, "failed stat on codemux.conf\n");
271     if (numServices)
272       return;
273     exit(-1);
274   }
275   if (statBuf.st_mtime == confFileReadTime)
276     return;
277
278   if ((f = fopen(CONF_FILE, "r")) == NULL) {
279     fprintf(stderr, "failed reading codemux.conf\n");
280     if (numServices)
281       return;
282     exit(-1);
283   }
284
285   /* conf file entries look like
286      coblitz.codeen.org princeton_coblitz 3125
287   */
288
289   while (1) {
290     ServiceSig serv;
291     int port;
292     if (line != NULL)
293       free(line);
294     
295     if ((line = GetNextLine(f)) == NULL)
296       break;
297
298     memset(&serv, 0, sizeof(serv));
299     if (WordCount(line) != 3) {
300       fprintf(stderr, "bad line: %s\n", line);
301       continue;
302     }
303     serv.ss_port = port = atoi(GetField(line, 2));
304     if (port < 1 || port > 65535 || port == DEMUX_PORT) {
305       fprintf(stderr, "bad port: %s\n", line);
306       continue;
307     }
308
309     serv.ss_host = GetWord(line, 0);
310     serv.ss_slice = GetWord(line, 1);
311
312     if (num == 0 && /* the first row must be an entry for apache */
313         (strcmp(serv.ss_host, "*") != 0 ||
314          strcmp(serv.ss_slice, "root") != 0)) {
315       fprintf(stderr, "first row has to be for webserver\n");
316       exit(-1);
317     }
318     if (num >= numAlloc) {
319       numAlloc = MAX(numAlloc * 2, 8);
320       servs = realloc(servs, numAlloc * sizeof(ServiceSig));
321     }
322     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
323     if (slices[serv.ss_slicePos].si_inUse == 0 &&
324         slices[serv.ss_slicePos].si_xid < 1)
325       anySliceXidsNeeded = TRUE; /* if new/inactive, we need xid */
326     servs[num] = serv;
327     num++;
328   }
329
330   fclose(f);
331
332   if (num == 1) {
333     if (numServices == 0) {
334       fprintf(stderr, "nothing found in codemux.conf\n");
335       exit(-1);
336     }
337     return;
338   }
339
340   for (i = 0; i < numServices; i++) {
341     free(serviceSig[i].ss_host);
342     free(serviceSig[i].ss_slice);
343   }
344   free(serviceSig);
345   serviceSig = servs;
346   numServices = num;
347   confFileReadTime = statBuf.st_mtime;
348 }
349 /*-----------------------------------------------------------------*/
350 static char *err400BadRequest =
351 "HTTP/1.0 400 Bad Request\r\n"
352 "Content-Type: text/html\r\n"
353 "\r\n"
354 "You are trying to access a PlanetLab node, and your\n"
355 "request header exceeded the allowable size. Please\n"
356 "try again if you believe this error is temporary.\n";
357 /*-----------------------------------------------------------------*/
358 static char *err503Unavailable =
359 "HTTP/1.0 503 Service Unavailable\r\n"
360 "Content-Type: text/html\r\n"
361 "\r\n"
362 "You are trying to access a PlanetLab node, but the service\n"
363 "seems to be unavailable at the moment. Please try again.\n";
364 /*-----------------------------------------------------------------*/
365 static char *err503TooBusy =
366 "HTTP/1.0 503 Service Unavailable\r\n"
367 "Content-Type: text/html\r\n"
368 "\r\n"
369 "You are trying to access a PlanetLab node, but the service\n"
370 "seems to be overloaded at the moment. Please try again.\n";
371 /*-----------------------------------------------------------------*/
372 static void
373 SetFd(int fd, OurFDSet *set)
374 {
375   if (highestSetFd < fd)
376     highestSetFd = fd;
377   FD_SET(fd, set);
378 }
379 /*-----------------------------------------------------------------*/
380 static void
381 ClearFd(int fd, OurFDSet *set)
382 {
383   FD_CLR(fd, set);
384 }
385 /*-----------------------------------------------------------------*/
386 static int
387 RemoveHeader(char *lower, char *real, int totalSize, char *header)
388 {
389   /* returns number of characters removed */
390   char h2[256];
391   int start, end, len;
392   char *temp, *conn;
393
394   sprintf(h2, "\n%s", header);
395
396   if ((conn = strstr(lower, h2)) == NULL)
397     return(0);
398
399   conn++;
400   /* determine how many characters to remove */
401   if ((temp = strchr(conn, '\n')) != NULL)
402     len = (temp - conn) + 1;
403   else
404     len = strlen(conn) + 1;
405   start = conn - lower;
406   end = start + len;
407   memmove(&real[start], &real[end], totalSize - end);
408   memmove(&lower[start], &lower[end], totalSize - end);
409
410   return(len);
411 }
412 /*-----------------------------------------------------------------*/
413 static int
414 InsertHeader(char *buf, int totalSize, char *header)
415 {
416   /* returns number of bytes inserted */
417   
418   char h2[256];
419   char *temp;
420   int len;
421   
422   sprintf(h2, "%s\r\n", header);
423   len = strlen(h2);
424
425   /* if we don't encounter a \n, it means that we have only a single
426      line, and we'd converted the \n to a \0 */
427   if ((temp = strchr(buf, '\n')) == NULL)
428     temp = strchr(buf, '\0');
429   temp++;
430   
431   memmove(temp + len, temp, totalSize - (temp - buf));
432   memcpy(temp, h2, len);
433   
434   return(len);
435 }
436 /*-----------------------------------------------------------------*/
437 static int
438 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
439 {
440   char *end;
441   char *lowerBuf;
442   char *hostVal;
443   char *buf = fb->fb_buf;
444   char orig[256];
445 #if 0
446   char *url;
447   int i;
448   int len;
449 #endif
450     
451   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
452     return(FAILURE);
453
454   /* insert client info after first line */
455   sprintf(orig, "X-CoDemux-Client: %s", inet_ntoa(addr));
456   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
457     
458   /* get just the header, so we can work on it */
459   LOCAL_STR_DUP_LOWER(lowerBuf, buf);
460   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
461     end = strstr(lowerBuf, "\n\n");
462   *end = '\0';
463   
464   /* remove any existing connection, keep-alive headers, add ours */
465   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "keep-alive:");
466   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "connection:");
467   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, "Connection: close");
468   InsertHeader(lowerBuf, fb->fb_used + 1, "connection: close");
469
470   /* isolate host, see if it matches */
471   if ((hostVal = strstr(lowerBuf, "\nhost:")) != NULL) {
472     int i;
473     hostVal += strlen("\nhost:");
474     if ((end = strchr(hostVal, '\n')) != NULL)
475       *end = '\0';
476     if ((end = strchr(hostVal, ':')) != NULL)
477       *end = '\0';
478     while (isspace(*hostVal))
479       hostVal++;
480     if (strlen(hostVal) > 0) {
481       hostVal = GetWord(hostVal, 0);
482       for (i = 1; i < numServices; i++) {
483         if (serviceSig[i].ss_host != NULL &&
484             DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
485           *whichService = i;
486           free(hostVal);
487           /* printf("%s", buf); */
488           return(SUCCESS);
489         }
490       }
491       free(hostVal);
492     }
493   }
494
495 #if 0
496   /* see if URL prefix matches */
497   if ((end = strchr(lowerBuf, '\n')) != NULL)
498     *end = 0;
499   if ((url = GetField(lowerBuf, 1)) == NULL ||
500       url[0] != '/') {
501     /* bad request - let apache handle it ? */
502     *whichService = 0;
503     return(SUCCESS);
504   }
505   url++;                        /* skip the leading slash */
506   for (i = 1; i < numServices; i++) {
507     if (serviceSig[i].ss_prefix != NULL &&
508         (len = strlen(serviceSig[i].ss_prefix)) > 0 &&
509         strncmp(url, serviceSig[i].ss_prefix, len) == 0 &&
510         (url[len] == ' ' || url[len] == '/')) {
511       int startPos = url - lowerBuf;
512       int stripLen = len + ((url[len] == '/') ? 1 : 0);
513       /* strip out prefix */
514       fb->fb_used -= stripLen;
515       memmove(&buf[startPos], &buf[startPos+stripLen], 
516               fb->fb_used + 1 - startPos);
517       /* printf("%s", buf); */
518       *whichService = i;
519       return(SUCCESS);
520     }
521   }
522 #endif
523
524   /* default to first service */
525   *whichService = 0;
526   return(SUCCESS);
527 }
528 /*-----------------------------------------------------------------*/
529 static int
530 StartConnect(int origFD, int whichService)
531 {
532   int sock;
533   struct sockaddr_in dest;
534   SockInfo *si;
535
536   /* create socket */
537   if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
538     return(FAILURE);
539   }
540   
541   /* make socket non-blocking */
542   if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
543     close(sock);
544     return(FAILURE);
545   }
546   
547   /* set addr structure */
548   memset(&dest, 0, sizeof(dest));
549   dest.sin_family = AF_INET;
550   dest.sin_port = htons(serviceSig[whichService].ss_port);
551   dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
552   
553   /* start connection process - we should be told that it's in
554      progress */
555   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
556       errno != EINPROGRESS) {
557     close(sock);
558     return(FAILURE);
559   }
560
561   SetFd(sock, &masterWriteSet); /* determine when connect finishes */
562   sockInfo[origFD].si_peerFd = sock;
563   si = &sockInfo[sock];
564   memset(si, 0, sizeof(SockInfo));
565   si->si_peerFd = origFD;
566   si->si_blocked = TRUE;        /* still connecting */
567   si->si_whichService = whichService;
568   si->si_writeBuf = sockInfo[origFD].si_readBuf;
569   sockInfo[origFD].si_readBuf->fb_refs++;
570   if (whichService >= 0)
571     SliceConnsInc(whichService);
572
573   return(SUCCESS);
574 }
575 /*-----------------------------------------------------------------*/
576 static int
577 WriteAvailData(int fd)
578 {
579   SockInfo *si = &sockInfo[fd];
580   FlowBuf *fb = si->si_writeBuf;
581   int res;
582
583   /* printf("trying to write fd %d\n", fd); */
584   if (fb->fb_used < 1 || si->si_blocked)
585     return(SUCCESS);
586
587   /* printf("trying to write %d bytes\n", fb->fb_used); */
588   /* write(STDOUT_FILENO, fb->fb_buf, fb->fb_used); */
589   if ((res = write(fd, fb->fb_buf, fb->fb_used)) > 0) {
590     fb->fb_used -= res;
591     if (fb->fb_used > 0) {
592       /* couldn't write all - assume blocked */
593       memmove(fb->fb_buf, &fb->fb_buf[res], fb->fb_used);
594       si->si_blocked = TRUE;
595       SetFd(fd, &masterWriteSet);
596     }
597     /* printf("wrote %d\n", res); */
598     return(SUCCESS);
599   }
600
601   /* we might have been full but didn't realize it */
602   if (res == -1 && errno == EAGAIN) {
603     si->si_blocked = TRUE;
604     SetFd(fd, &masterWriteSet);
605     return(SUCCESS);
606   }
607
608   /* otherwise, assume the worst */
609   return(FAILURE);
610 }
611 /*-----------------------------------------------------------------*/
612 static OurFDSet socksToCloseVec;
613 static int numSocksToClose;
614 static int whichSocksToClose[TARG_SETSIZE];
615 /*-----------------------------------------------------------------*/
616 static void
617 CloseSock(int fd)
618 {
619   if (FD_ISSET(fd, &socksToCloseVec))
620     return;
621   SetFd(fd, &socksToCloseVec);
622   whichSocksToClose[numSocksToClose] = fd;
623   numSocksToClose++;
624 }
625 /*-----------------------------------------------------------------*/
626 static void
627 DecBuf(FlowBuf *buf)
628 {
629   if (buf == NULL)
630     return;
631   buf->fb_refs--;
632   if (buf->fb_refs == 0) {
633     free(buf->fb_buf);
634     free(buf);
635   }
636 }
637 /*-----------------------------------------------------------------*/
638 static void
639 ReallyCloseSocks(void)
640 {
641   int i;
642
643   memset(&socksToCloseVec, 0, sizeof(socksToCloseVec));
644
645   for (i = 0; i < numSocksToClose; i++) {
646     int fd = whichSocksToClose[i];
647     close(fd);
648     DecBuf(sockInfo[fd].si_readBuf);
649     DecBuf(sockInfo[fd].si_writeBuf);
650     ClearFd(fd, &masterReadSet);
651     ClearFd(fd, &masterWriteSet);
652     if (sockInfo[fd].si_needsHeaderSince) {
653       sockInfo[fd].si_needsHeaderSince = 0;
654       numNeedingHeaders--;
655     }
656     if (sockInfo[fd].si_whichService >= 0) {
657       SliceConnsDec(sockInfo[fd].si_whichService);
658       sockInfo[fd].si_whichService = -1;
659     }
660     /* KyoungSoo*/
661     if (sockInfo[fd].si_peerFd >= 0) {
662       sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
663     }
664   }
665   numSocksToClose = 0;
666 }
667 /*-----------------------------------------------------------------*/
668 static void
669 SocketReadyToRead(int fd)
670 {
671   SockInfo *si = &sockInfo[fd];
672   int spaceLeft;
673   FlowBuf *fb;
674   int res;
675
676   /* if peer is closed, close ourselves */
677   if (si->si_peerFd < 0 && (!si->si_needsHeaderSince)) {
678     CloseSock(fd);
679     return;
680   }
681
682   if ((fb = si->si_readBuf) == NULL) {
683     fb = si->si_readBuf = calloc(1, sizeof(FlowBuf));
684     fb->fb_refs = 1;
685     if (si->si_peerFd >= 0) {
686       sockInfo[si->si_peerFd].si_writeBuf = fb;
687       fb->fb_refs = 2;
688     }
689   }
690
691   if (fb->fb_buf == NULL)
692     fb->fb_buf = malloc(FB_ALLOCSIZE);
693
694   /* determine read buffer size - if 0, then block reads and return */
695   if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
696     if (si->si_needsHeaderSince) {
697       write(fd, err400BadRequest, strlen(err400BadRequest));
698       CloseSock(fd);
699       return;
700     }
701     else {
702       ClearFd(fd, &masterReadSet);
703       return;
704     }
705   } 
706   
707   /* read as much as allowed, and is available */
708   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
709     CloseSock(fd);
710     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
711       CloseSock(si->si_peerFd);
712       si->si_peerFd = -1;
713     }
714     return;
715   }
716   if (res == -1) {
717     if (errno == EAGAIN)
718       return;
719     TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
720     CloseSock(fd);
721     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
722       CloseSock(si->si_peerFd);
723       si->si_peerFd = -1;
724     }
725     return;
726   }
727   fb->fb_used += res;
728   fb->fb_buf[fb->fb_used] = 0;  /* terminate it for convenience */
729   //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
730
731   /* if we need header, check if we've gotten it. if so, do
732      modifications and continue. if not, check if we've read the
733      maximum, and if so, fail */
734   if (si->si_needsHeaderSince) {
735     int whichService;
736     SliceInfo *slice;
737
738 #define STATUS_REQ "GET /codemux/status.txt"
739     if (strncasecmp(fb->fb_buf, STATUS_REQ, sizeof(STATUS_REQ)-1) == 0) {
740       DumpStatus(fd);
741       CloseSock(fd);
742       return;
743     }
744
745     //    printf("trying to find service\n");
746     if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
747       return;
748     //    printf("found service %d\n", whichService);
749     slice = ServiceToSlice(whichService);
750
751     /* no service can have more than some absolute max number of
752        connections. Also, when we're too busy, start enforcing
753        fairness across the servers */
754     if (slice->si_numConns > SERVICE_MAX ||
755         (numTotalSliceConns > FAIRNESS_CUTOFF && 
756          slice->si_numConns > MAX_CONNS/numActiveSlices)) {
757       write(fd, err503TooBusy, strlen(err503TooBusy));
758       TRACE("CloseSock(): fd=%d too busy\n", fd);
759       CloseSock(fd);
760       return;
761     }
762
763     if (slice->si_xid > 0) {
764       static int first = 1;
765       setsockopt(fd, SOL_SOCKET, SO_SETXID, 
766                  &slice->si_xid, sizeof(slice->si_xid));
767       if (first) {
768         /* just to log it for once */
769         fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
770                 slice->si_xid, slice->si_sliceName);
771         first = 0;
772       }
773     }
774
775     si->si_needsHeaderSince = 0;
776     numNeedingHeaders--;
777     if (StartConnect(fd, whichService) != SUCCESS) {
778       write(fd, err503Unavailable, strlen(err503Unavailable));
779       TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
780       CloseSock(fd);
781       return;
782     }
783     return;
784   }
785
786   /* write anything possible */
787   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
788     /* assume the worst and close */
789     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
790           fd, errno, strerror(errno));
791     CloseSock(fd);
792     if (si->si_peerFd >=0) {
793       CloseSock(si->si_peerFd);
794       si->si_peerFd = -1;
795     }
796   }
797 }
798 /*-----------------------------------------------------------------*/
799 static void
800 SocketReadyToWrite(int fd)
801 {
802   SockInfo *si = &sockInfo[fd];
803
804   /* unblock it and read what it has */
805   si->si_blocked = FALSE;
806   ClearFd(fd, &masterWriteSet);
807   SetFd(fd, &masterReadSet);
808   
809   /* enable reading on peer just in case it was off */
810   if (si->si_peerFd >= 0)
811     SetFd(si->si_peerFd, &masterReadSet);
812     
813   /* if we have data, write it */
814   if (WriteAvailData(fd) != SUCCESS) {
815    /* assume the worst and close */
816     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
817           fd, errno, strerror(errno));
818     CloseSock(fd);
819     if (si->si_peerFd >= 0) {
820       CloseSock(si->si_peerFd);
821       si->si_peerFd = -1;
822     }
823     return;
824   }
825
826   /* if peer is closed and we're done writing, we should close */
827   if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
828     CloseSock(fd);
829   }
830 }
831 /*-----------------------------------------------------------------*/
832 static void
833 CloseReqlessConns(void)
834 {
835   static int lastSweep;
836   int maxAge;
837   int i;
838
839   if (lastSweep == now)
840     return;
841   lastSweep = now;
842
843   if (numTotalSliceConns + numNeedingHeaders > MAX_CONNS ||
844       numNeedingHeaders > TARG_SETSIZE/20) {
845     /* second condition is probably an attack - close aggressively */
846     maxAge = 5;
847   }
848   else if (numTotalSliceConns + numNeedingHeaders > FAIRNESS_CUTOFF ||
849            numNeedingHeaders > TARG_SETSIZE/40) {
850     /* sweep a little aggressively */
851     maxAge = 10;
852   }
853   else if (numNeedingHeaders > TARG_SETSIZE/80) {
854     /* just sweep to close strays */
855     maxAge = 30;
856   }
857   else {
858     /* too little gained - not worth sweeping */
859     return;
860   }
861
862   /* if it's too old, close it */
863   for (i = 0; i < highestSetFd+1; i++) {
864     if (sockInfo[i].si_needsHeaderSince &&
865         (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
866       CloseSock(i);
867   }
868 }
869 /*-----------------------------------------------------------------*/
870 static void
871 MainLoop(int lisSock)
872 {
873   int i;
874   OurFDSet tempReadSet, tempWriteSet;
875   int res;
876   int lastConfCheck = 0;
877
878   signal(SIGPIPE, SIG_IGN);
879
880   while (1) {
881     int newSock;
882     int ceiling;
883     struct timeval timeout;
884
885     now = time(NULL);
886
887     if (now - lastConfCheck > 300) {
888       ReadConfFile();
889       GetSliceXids();           /* always call - in case new slices created */
890       lastConfCheck = now;
891     }
892
893     /* see if there's any activity */
894     tempReadSet = masterReadSet;
895     tempWriteSet = masterWriteSet;
896
897     /* trim it down if needed */
898     while (highestSetFd > 1 &&
899            (!FD_ISSET(highestSetFd, &tempReadSet)) &&
900            (!FD_ISSET(highestSetFd, &tempWriteSet)))
901       highestSetFd--;
902     timeout.tv_sec = 1;
903     timeout.tv_usec = 0;
904     res = select(highestSetFd+1, (fd_set *) &tempReadSet, 
905                  (fd_set *) &tempWriteSet, NULL, &timeout);
906     if (res < 0 && errno != EINTR) {
907       perror("select");
908       exit(-1);
909     }
910
911     now = time(NULL);
912
913     /* clear the bit for listen socket to avoid confusion */
914     ClearFd(lisSock, &tempReadSet);
915     
916     ceiling = highestSetFd+1;   /* copy it, since it changes during loop */
917     /* pass data back and forth as needed */
918     for (i = 0; i < ceiling; i++) {
919       if (FD_ISSET(i, &tempWriteSet))
920         SocketReadyToWrite(i);
921     }
922     for (i = 0; i < ceiling; i++) {
923       if (FD_ISSET(i, &tempReadSet))
924         SocketReadyToRead(i);
925     }
926
927     /* see if we need to close conns w/o requests */
928     CloseReqlessConns();
929     
930     /* do all closes */
931     ReallyCloseSocks();
932
933     /* try accepting new connections */
934     do {
935       struct sockaddr_in addr;
936       socklen_t lenAddr = sizeof(addr);
937       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
938                             &lenAddr)) >= 0) {
939         /* make socket non-blocking */
940         if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
941           close(newSock);
942           continue;
943         }
944         memset(&sockInfo[newSock], 0, sizeof(SockInfo));
945         sockInfo[newSock].si_needsHeaderSince = now;
946         numNeedingHeaders++;
947         sockInfo[newSock].si_peerFd = -1;
948         sockInfo[newSock].si_cliAddr = addr.sin_addr;
949         sockInfo[newSock].si_whichService = -1;
950         SetFd(newSock, &masterReadSet);
951       }
952     } while (newSock >= 0);
953   }
954 }
955 /*-----------------------------------------------------------------*/
956 static int 
957 InitDaemon(void)
958 {
959   pid_t pid;
960   FILE *pidfile;
961   
962   pidfile = fopen(PIDFILE, "w");
963   if (pidfile == NULL) {
964     fprintf(stderr, "%s creation failed\n", PIDFILE);
965     return(-1);
966   }
967
968   if ((pid = fork()) < 0) {
969     fclose(pidfile);
970     return(-1);
971   }
972   else if (pid != 0) {
973     /* i'm the parent, writing down the child pid  */
974     fprintf(pidfile, "%u\n", pid);
975     fclose(pidfile);
976     exit(0);
977   }
978
979   /* close the pid file */
980   fclose(pidfile);
981
982   /* routines for any daemon process
983      1. create a new session 
984      2. change directory to the root
985      3. change the file creation permission 
986   */
987   setsid();
988   chdir("/");
989   umask(0);
990
991   return(0);
992 }
993 /*-----------------------------------------------------------------*/
994 static int
995 OpenLogFile(void)
996 {
997   static const char* logfile = "/var/log/codemux.log";
998   static const char* oldlogfile = "/var/log/codemux.log.old";
999   int logfd;
1000
1001   /* if the previous log file exists,
1002      rename it to the oldlogfile */
1003   if (access(logfile, F_OK) == 0) {
1004     if (rename(logfile, oldlogfile) < 0) {
1005       fprintf(stderr, "cannot rotate the logfile err=%s\n",
1006               strerror(errno));
1007       exit(-1);
1008     }
1009   }
1010
1011   logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
1012   if (logfd < 0) {
1013     fprintf(stderr, "cannot open the logfile err=%s\n",
1014             strerror(errno));
1015     exit(-1);
1016   }
1017
1018   /* duplicate logfile to stderr */
1019   if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
1020     fprintf(stderr, "cannot open the logfile err=%s\n",
1021             strerror(errno));
1022     exit(-1);
1023   }
1024   
1025   /* set the close-on-exec flag */
1026   if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
1027     fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
1028             strerror(errno));
1029     exit(-1);
1030   }
1031
1032   return logfd;
1033 }
1034 /*-----------------------------------------------------------------*/
1035 int
1036 main(int argc, char *argv[])
1037 {
1038   int lisSock;
1039   int logFd;
1040
1041   /* do the daemon stuff */
1042   if (argc <= 1 || strcmp(argv[1], "-d") != 0) {
1043     if (InitDaemon() < 0) {
1044       fprintf(stderr, "codemux daemon_init() failed\n");
1045       exit(-1);
1046     }
1047   }
1048
1049   /* create the accept socket */
1050   if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
1051     fprintf(stderr, "failed creating accept socket\n");
1052     exit(-1);
1053   }
1054   SetFd(lisSock, &masterReadSet);
1055
1056   /* open the log file */
1057   logFd = OpenLogFile();
1058
1059
1060   /* write down the version */
1061   fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
1062
1063   while (1) {
1064     numForks++;
1065     if (fork()) {
1066       /* this is the parent - just wait */
1067       while (wait3(NULL, 0, NULL) < 1)
1068         ;                       /* just keep waiting for a real pid */
1069     }
1070     else {
1071       /* child process */
1072       MainLoop(lisSock);
1073       exit(-1);
1074     }
1075   }
1076 }
1077 /*-----------------------------------------------------------------*/