- fixed the slow memory leak problem.
[codemux.git] / codemux.c
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <sys/stat.h>
4 #include <sys/wait.h>
5 #include <netinet/in.h>
6 #include <arpa/inet.h>
7 #include <ctype.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <signal.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <time.h>
14 #include <unistd.h>
15 #include <string.h>
16 #include "codemuxlib.h"
17 #include "debug.h"
18
19 #ifdef DEBUG
20 HANDLE hdebugLog;
21 int defaultTraceSync;
22 #endif
23
24 #define CONF_FILE "/etc/codemux/codemux.conf"
25 #define DEMUX_PORT 80
26 #define PIDFILE "/var/run/codemux.pid"
27 #define TARG_SETSIZE 4096
28
29 /* set aside some small number of fds for us, allow the rest for
30    connections */
31 #define MAX_CONNS ((TARG_SETSIZE-20)/2)
32
33 /* no single service can take more than half the connections */
34 #define SERVICE_MAX (MAX_CONNS/2)
35
36 /* how many total connections before we get concerned about fairness
37    among them */
38 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
39
40 /* codemux version */
41 #define CODEMUX_VERSION "0.3"
42
43 typedef struct FlowBuf {
44   int fb_refs;                  /* num refs */
45   char *fb_buf;                 /* actual buffer */
46   int fb_used;                  /* bytes used in buffer */
47 } FlowBuf;
48 #define FB_SIZE 3800            /* max usable size */
49 #define FB_ALLOCSIZE 4000       /* extra to include IP address */
50
51 typedef struct SockInfo {
52   int si_peerFd;                /* fd of peer */
53   struct in_addr si_cliAddr;    /* address of client */
54   int si_blocked;               /* are we blocked? */
55   int si_needsHeaderSince;      /* since when are we waiting for a header */
56   int si_whichService;          /* index of service */
57   FlowBuf *si_readBuf;          /* read data into this buffer */
58   FlowBuf *si_writeBuf;         /* drain this buffer for writing */
59 } SockInfo;
60
61 static SockInfo sockInfo[TARG_SETSIZE]; /* fd number of peer socket */
62
63 typedef struct ServiceSig {
64   char *ss_host;                /* suffix in host */
65   char *ss_slice;
66   short ss_port;
67   int ss_slicePos;              /* position in slices array */
68 } ServiceSig;
69
70 static ServiceSig *serviceSig;
71 static int numServices;
72 static int confFileReadTime;
73 static int now;
74
75 typedef struct SliceInfo {
76   char *si_sliceName;
77   int si_inUse;                 /* do any services refer to this? */
78   int si_numConns;
79   int si_xid;
80 } SliceInfo;
81
82 static SliceInfo *slices;
83 static int numSlices;
84 static int numActiveSlices;
85 static int numTotalSliceConns;
86 static int anySliceXidsNeeded;
87
88 typedef struct OurFDSet {
89   long __fds_bits[TARG_SETSIZE/32];
90 } OurFDSet;
91 static OurFDSet masterReadSet, masterWriteSet;
92 static int highestSetFd;
93 static int numNeedingHeaders;   /* how many conns waiting on headers? */
94
95 static int numForks;
96
97 #ifndef SO_SETXID
98 #define SO_SETXID SO_PEERCRED
99 #endif
100 /*-----------------------------------------------------------------*/
101 static SliceInfo *
102 ServiceToSlice(int whichService)
103 {
104   if (whichService < 0)
105     return(NULL);
106   return(&slices[serviceSig[whichService].ss_slicePos]);
107 }
108 /*-----------------------------------------------------------------*/
109 static void
110 DumpStatus(int fd)
111 {
112   char buf[65535];
113   char *start = buf;
114   int i;
115   int len;
116
117   sprintf(start, 
118           "CoDemux version %s\n"
119           "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
120           "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
121           CODEMUX_VERSION,
122           numForks, numActiveSlices, numTotalSliceConns,
123           numNeedingHeaders, anySliceXidsNeeded);
124   start += strlen(start);
125
126   for (i = 0; i < numSlices; i++) {
127     SliceInfo *si = &slices[i];
128     sprintf(start, "Slice %d: %s xid %d, %d conns, inUse %d\n", 
129             i, si->si_sliceName, si->si_xid, si->si_numConns,
130             si->si_inUse);
131     start += strlen(start);
132   }
133
134   for (i = 0; i < numServices; i++) {
135     ServiceSig *ss = &serviceSig[i];
136     sprintf(start, "Service %d: %s %s port %d, slice# %d\n", i, ss->ss_host,
137             ss->ss_slice, (int) ss->ss_port, ss->ss_slicePos);
138     start += strlen(start);
139   }
140
141   len = start - buf;
142   write(fd, buf, len);
143 }
144 /*-----------------------------------------------------------------*/
145 static void
146 GetSliceXids(void)
147 {
148   /* walks through /etc/passwd, and gets the uid for every slice we
149      have */
150   FILE *f;
151   char *line;
152   int i;
153
154   if (!anySliceXidsNeeded)
155     return;
156
157   for (i = 0; i < numSlices; i++) {
158     SliceInfo *si = &slices[i];
159     si->si_inUse = 0;
160   }
161   for (i = 0; i < numServices; i++) {
162     SliceInfo *si = ServiceToSlice(i);
163     if (si != NULL)
164       si->si_inUse++;
165   }  
166
167   if ((f = fopen("/etc/passwd", "r")) == NULL)
168     return;
169
170   while ((line = GetNextLine(f)) != NULL) {
171     char *temp;
172     int xid;
173
174     if ((temp = strchr(line, ':')) == NULL)
175       goto next_line;                   /* weird line */
176     *temp = '\0';               /* terminate slice name */
177     temp++;
178     if ((temp = strchr(temp+1, ':')) == NULL)
179       goto next_line;   /* weird line */
180     if ((xid = atoi(temp+1)) < 1)
181       goto next_line;   /* weird xid */
182     
183     /* we've got a slice name and xid, let's try to match */
184     for (i = 0; i < numSlices; i++) {
185       if (slices[i].si_xid == 0 &&
186           strcasecmp(slices[i].si_sliceName, line) == 0) {
187         slices[i].si_xid = xid;
188         break;
189       }
190     }
191   next_line:
192     if (line)
193       xfree(line);
194   }
195
196   /* assume service 0 is the root service, and don't check it since
197      it'll have xid zero */
198   anySliceXidsNeeded = FALSE;
199   for (i = 1; i < numSlices; i++) {
200     if (slices[i].si_xid == 0 && slices[i].si_inUse > 0) {
201       anySliceXidsNeeded = TRUE;
202       break;
203     }
204   }
205
206   fclose(f);
207 }
208 /*-----------------------------------------------------------------*/
209 static void
210 SliceConnsInc(int whichService)
211 {
212   SliceInfo *si = ServiceToSlice(whichService);
213
214   if (si == NULL)
215     return;
216   numTotalSliceConns++;
217   si->si_numConns++;
218   if (si->si_numConns == 1)
219     numActiveSlices++;
220 }
221 /*-----------------------------------------------------------------*/
222 static void
223 SliceConnsDec(int whichService)
224 {
225   SliceInfo *si = ServiceToSlice(whichService);
226
227   if (si == NULL)
228     return;
229   numTotalSliceConns--;
230   si->si_numConns--;
231   if (si->si_numConns == 0)
232     numActiveSlices--;
233 }
234 /*-----------------------------------------------------------------*/
235 static int
236 WhichSlicePos(char *slice)
237 {
238   /* adds the new slice if necessary, returns the index into slice
239      array. Never change the ordering of existing slices */
240   int i;
241   static int numSlicesAlloc;
242
243   for (i = 0; i < numSlices; i++) {
244     if (strcasecmp(slice, slices[i].si_sliceName) == 0)
245       return(i);
246   }
247
248   if (numSlices >= numSlicesAlloc) {
249     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
250     slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
251   }
252
253   memset(&slices[numSlices], 0, sizeof(SliceInfo));
254   slices[numSlices].si_sliceName = xstrdup(slice);
255   numSlices++;
256   return(numSlices-1);
257 }
258 /*-----------------------------------------------------------------*/
259 static void
260 ReadConfFile(void)
261 {
262   int numAlloc = 0;
263   int num = 0;
264   ServiceSig *servs = NULL;
265   FILE *f;
266   char *line = NULL;
267   struct stat statBuf;
268   int i;
269
270   if (stat(CONF_FILE, &statBuf) != 0) {
271     fprintf(stderr, "failed stat on codemux.conf\n");
272     if (numServices)
273       return;
274     exit(-1);
275   }
276   if (statBuf.st_mtime == confFileReadTime)
277     return;
278
279   if ((f = fopen(CONF_FILE, "r")) == NULL) {
280     fprintf(stderr, "failed reading codemux.conf\n");
281     if (numServices)
282       return;
283     exit(-1);
284   }
285
286   /* conf file entries look like
287      coblitz.codeen.org princeton_coblitz 3125
288   */
289
290   while (1) {
291     ServiceSig serv;
292     int port;
293     if (line != NULL)
294       xfree(line);
295     
296     if ((line = GetNextLine(f)) == NULL)
297       break;
298
299     memset(&serv, 0, sizeof(serv));
300     if (WordCount(line) != 3) {
301       fprintf(stderr, "bad line: %s\n", line);
302       continue;
303     }
304     serv.ss_port = port = atoi(GetField(line, 2));
305     if (port < 1 || port > 65535 || port == DEMUX_PORT) {
306       fprintf(stderr, "bad port: %s\n", line);
307       continue;
308     }
309
310     serv.ss_host = GetWord(line, 0);
311     serv.ss_slice = GetWord(line, 1);
312
313     if (num == 0 && /* the first row must be an entry for apache */
314         (strcmp(serv.ss_host, "*") != 0 ||
315          strcmp(serv.ss_slice, "root") != 0)) {
316       fprintf(stderr, "first row has to be for webserver\n");
317       exit(-1);
318     }
319     if (num >= numAlloc) {
320       numAlloc = MAX(numAlloc * 2, 8);
321       servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
322     }
323     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
324     if (slices[serv.ss_slicePos].si_inUse == 0 &&
325         slices[serv.ss_slicePos].si_xid < 1)
326       anySliceXidsNeeded = TRUE; /* if new/inactive, we need xid */
327     servs[num] = serv;
328     num++;
329   }
330
331   fclose(f);
332
333   if (num == 1) {
334     if (numServices == 0) {
335       fprintf(stderr, "nothing found in codemux.conf\n");
336       exit(-1);
337     }
338     return;
339   }
340
341   for (i = 0; i < numServices; i++) {
342     xfree(serviceSig[i].ss_host);
343     xfree(serviceSig[i].ss_slice);
344   }
345   xfree(serviceSig);
346   serviceSig = servs;
347   numServices = num;
348   confFileReadTime = statBuf.st_mtime;
349 }
350 /*-----------------------------------------------------------------*/
351 static char *err400BadRequest =
352 "HTTP/1.0 400 Bad Request\r\n"
353 "Content-Type: text/html\r\n"
354 "\r\n"
355 "You are trying to access a PlanetLab node, and your\n"
356 "request header exceeded the allowable size. Please\n"
357 "try again if you believe this error is temporary.\n";
358 /*-----------------------------------------------------------------*/
359 static char *err503Unavailable =
360 "HTTP/1.0 503 Service Unavailable\r\n"
361 "Content-Type: text/html\r\n"
362 "\r\n"
363 "You are trying to access a PlanetLab node, but the service\n"
364 "seems to be unavailable at the moment. Please try again.\n";
365 /*-----------------------------------------------------------------*/
366 static char *err503TooBusy =
367 "HTTP/1.0 503 Service Unavailable\r\n"
368 "Content-Type: text/html\r\n"
369 "\r\n"
370 "You are trying to access a PlanetLab node, but the service\n"
371 "seems to be overloaded at the moment. Please try again.\n";
372 /*-----------------------------------------------------------------*/
373 static void
374 SetFd(int fd, OurFDSet *set)
375 {
376   if (highestSetFd < fd)
377     highestSetFd = fd;
378   FD_SET(fd, set);
379 }
380 /*-----------------------------------------------------------------*/
381 static void
382 ClearFd(int fd, OurFDSet *set)
383 {
384   FD_CLR(fd, set);
385 }
386 /*-----------------------------------------------------------------*/
387 static int
388 RemoveHeader(char *lower, char *real, int totalSize, char *header)
389 {
390   /* returns number of characters removed */
391   char h2[256];
392   int start, end, len;
393   char *temp, *conn;
394
395   sprintf(h2, "\n%s", header);
396
397   if ((conn = strstr(lower, h2)) == NULL)
398     return(0);
399
400   conn++;
401   /* determine how many characters to remove */
402   if ((temp = strchr(conn, '\n')) != NULL)
403     len = (temp - conn) + 1;
404   else
405     len = strlen(conn) + 1;
406   start = conn - lower;
407   end = start + len;
408   memmove(&real[start], &real[end], totalSize - end);
409   memmove(&lower[start], &lower[end], totalSize - end);
410
411   return(len);
412 }
413 /*-----------------------------------------------------------------*/
414 static int
415 InsertHeader(char *buf, int totalSize, char *header)
416 {
417   /* returns number of bytes inserted */
418   
419   char h2[256];
420   char *temp;
421   int len;
422   
423   sprintf(h2, "%s\r\n", header);
424   len = strlen(h2);
425
426   /* if we don't encounter a \n, it means that we have only a single
427      line, and we'd converted the \n to a \0 */
428   if ((temp = strchr(buf, '\n')) == NULL)
429     temp = strchr(buf, '\0');
430   temp++;
431   
432   memmove(temp + len, temp, totalSize - (temp - buf));
433   memcpy(temp, h2, len);
434   
435   return(len);
436 }
437 /*-----------------------------------------------------------------*/
438 static int
439 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
440 {
441   char *end;
442   char *lowerBuf;
443   char *hostVal;
444   char *buf = fb->fb_buf;
445   char orig[256];
446 #if 0
447   char *url;
448   int i;
449   int len;
450 #endif
451     
452   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
453     return(FAILURE);
454
455   /* insert client info after first line */
456   sprintf(orig, "X-CoDemux-Client: %s", inet_ntoa(addr));
457   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
458     
459   /* get just the header, so we can work on it */
460   LOCAL_STR_DUP_LOWER(lowerBuf, buf);
461   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
462     end = strstr(lowerBuf, "\n\n");
463   *end = '\0';
464   
465   /* remove any existing connection, keep-alive headers, add ours */
466   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "keep-alive:");
467   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "connection:");
468   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, "Connection: close");
469   InsertHeader(lowerBuf, fb->fb_used + 1, "connection: close");
470
471   /* isolate host, see if it matches */
472   if ((hostVal = strstr(lowerBuf, "\nhost:")) != NULL) {
473     int i;
474     hostVal += strlen("\nhost:");
475     if ((end = strchr(hostVal, '\n')) != NULL)
476       *end = '\0';
477     if ((end = strchr(hostVal, ':')) != NULL)
478       *end = '\0';
479     while (isspace(*hostVal))
480       hostVal++;
481     if (strlen(hostVal) > 0) {
482       hostVal = GetWord(hostVal, 0);
483       for (i = 1; i < numServices; i++) {
484         if (serviceSig[i].ss_host != NULL &&
485             DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
486           *whichService = i;
487           free(hostVal);
488           /* printf("%s", buf); */
489           return(SUCCESS);
490         }
491       }
492       free(hostVal);
493     }
494   }
495
496 #if 0
497   /* see if URL prefix matches */
498   if ((end = strchr(lowerBuf, '\n')) != NULL)
499     *end = 0;
500   if ((url = GetField(lowerBuf, 1)) == NULL ||
501       url[0] != '/') {
502     /* bad request - let apache handle it ? */
503     *whichService = 0;
504     return(SUCCESS);
505   }
506   url++;                        /* skip the leading slash */
507   for (i = 1; i < numServices; i++) {
508     if (serviceSig[i].ss_prefix != NULL &&
509         (len = strlen(serviceSig[i].ss_prefix)) > 0 &&
510         strncmp(url, serviceSig[i].ss_prefix, len) == 0 &&
511         (url[len] == ' ' || url[len] == '/')) {
512       int startPos = url - lowerBuf;
513       int stripLen = len + ((url[len] == '/') ? 1 : 0);
514       /* strip out prefix */
515       fb->fb_used -= stripLen;
516       memmove(&buf[startPos], &buf[startPos+stripLen], 
517               fb->fb_used + 1 - startPos);
518       /* printf("%s", buf); */
519       *whichService = i;
520       return(SUCCESS);
521     }
522   }
523 #endif
524
525   /* default to first service */
526   *whichService = 0;
527   return(SUCCESS);
528 }
529 /*-----------------------------------------------------------------*/
530 static int
531 StartConnect(int origFD, int whichService)
532 {
533   int sock;
534   struct sockaddr_in dest;
535   SockInfo *si;
536
537   /* create socket */
538   if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
539     return(FAILURE);
540   }
541   
542   /* make socket non-blocking */
543   if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
544     close(sock);
545     return(FAILURE);
546   }
547   
548   /* set addr structure */
549   memset(&dest, 0, sizeof(dest));
550   dest.sin_family = AF_INET;
551   dest.sin_port = htons(serviceSig[whichService].ss_port);
552   dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
553   
554   /* start connection process - we should be told that it's in
555      progress */
556   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
557       errno != EINPROGRESS) {
558     close(sock);
559     return(FAILURE);
560   }
561
562   SetFd(sock, &masterWriteSet); /* determine when connect finishes */
563   sockInfo[origFD].si_peerFd = sock;
564   si = &sockInfo[sock];
565   memset(si, 0, sizeof(SockInfo));
566   si->si_peerFd = origFD;
567   si->si_blocked = TRUE;        /* still connecting */
568   si->si_whichService = whichService;
569   si->si_writeBuf = sockInfo[origFD].si_readBuf;
570   sockInfo[origFD].si_readBuf->fb_refs++;
571   if (whichService >= 0)
572     SliceConnsInc(whichService);
573
574   return(SUCCESS);
575 }
576 /*-----------------------------------------------------------------*/
577 static int
578 WriteAvailData(int fd)
579 {
580   SockInfo *si = &sockInfo[fd];
581   FlowBuf *fb = si->si_writeBuf;
582   int res;
583
584   /* printf("trying to write fd %d\n", fd); */
585   if (fb->fb_used < 1 || si->si_blocked)
586     return(SUCCESS);
587
588   /* printf("trying to write %d bytes\n", fb->fb_used); */
589   /* write(STDOUT_FILENO, fb->fb_buf, fb->fb_used); */
590   if ((res = write(fd, fb->fb_buf, fb->fb_used)) > 0) {
591     fb->fb_used -= res;
592     if (fb->fb_used > 0) {
593       /* couldn't write all - assume blocked */
594       memmove(fb->fb_buf, &fb->fb_buf[res], fb->fb_used);
595       si->si_blocked = TRUE;
596       SetFd(fd, &masterWriteSet);
597     }
598     /* printf("wrote %d\n", res); */
599     return(SUCCESS);
600   }
601
602   /* we might have been full but didn't realize it */
603   if (res == -1 && errno == EAGAIN) {
604     si->si_blocked = TRUE;
605     SetFd(fd, &masterWriteSet);
606     return(SUCCESS);
607   }
608
609   /* otherwise, assume the worst */
610   return(FAILURE);
611 }
612 /*-----------------------------------------------------------------*/
613 static OurFDSet socksToCloseVec;
614 static int numSocksToClose;
615 static int whichSocksToClose[TARG_SETSIZE];
616 /*-----------------------------------------------------------------*/
617 static void
618 CloseSock(int fd)
619 {
620   if (FD_ISSET(fd, &socksToCloseVec))
621     return;
622   SetFd(fd, &socksToCloseVec);
623   whichSocksToClose[numSocksToClose] = fd;
624   numSocksToClose++;
625 }
626 /*-----------------------------------------------------------------*/
627 static void
628 DecBuf(FlowBuf *buf)
629 {
630   if (buf == NULL)
631     return;
632   buf->fb_refs--;
633   if (buf->fb_refs == 0) {
634     free(buf->fb_buf);
635     free(buf);
636   }
637 }
638 /*-----------------------------------------------------------------*/
639 static void
640 ReallyCloseSocks(void)
641 {
642   int i;
643
644   memset(&socksToCloseVec, 0, sizeof(socksToCloseVec));
645
646   for (i = 0; i < numSocksToClose; i++) {
647     int fd = whichSocksToClose[i];
648     close(fd);
649     DecBuf(sockInfo[fd].si_readBuf);
650     DecBuf(sockInfo[fd].si_writeBuf);
651     ClearFd(fd, &masterReadSet);
652     ClearFd(fd, &masterWriteSet);
653     if (sockInfo[fd].si_needsHeaderSince) {
654       sockInfo[fd].si_needsHeaderSince = 0;
655       numNeedingHeaders--;
656     }
657     if (sockInfo[fd].si_whichService >= 0) {
658       SliceConnsDec(sockInfo[fd].si_whichService);
659       sockInfo[fd].si_whichService = -1;
660     }
661     /* KyoungSoo*/
662     if (sockInfo[fd].si_peerFd >= 0) {
663       sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
664     }
665   }
666   numSocksToClose = 0;
667 }
668 /*-----------------------------------------------------------------*/
669 static void
670 SocketReadyToRead(int fd)
671 {
672   SockInfo *si = &sockInfo[fd];
673   int spaceLeft;
674   FlowBuf *fb;
675   int res;
676
677   /* if peer is closed, close ourselves */
678   if (si->si_peerFd < 0 && (!si->si_needsHeaderSince)) {
679     CloseSock(fd);
680     return;
681   }
682
683   if ((fb = si->si_readBuf) == NULL) {
684     fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
685     fb->fb_refs = 1;
686     if (si->si_peerFd >= 0) {
687       sockInfo[si->si_peerFd].si_writeBuf = fb;
688       fb->fb_refs = 2;
689     }
690   }
691
692   if (fb->fb_buf == NULL)
693     fb->fb_buf = xmalloc(FB_ALLOCSIZE);
694
695   /* determine read buffer size - if 0, then block reads and return */
696   if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
697     if (si->si_needsHeaderSince) {
698       write(fd, err400BadRequest, strlen(err400BadRequest));
699       CloseSock(fd);
700       return;
701     }
702     else {
703       ClearFd(fd, &masterReadSet);
704       return;
705     }
706   } 
707   
708   /* read as much as allowed, and is available */
709   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
710     CloseSock(fd);
711     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
712       CloseSock(si->si_peerFd);
713       si->si_peerFd = -1;
714     }
715     return;
716   }
717   if (res == -1) {
718     if (errno == EAGAIN)
719       return;
720     TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
721     CloseSock(fd);
722     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
723       CloseSock(si->si_peerFd);
724       si->si_peerFd = -1;
725     }
726     return;
727   }
728   fb->fb_used += res;
729   fb->fb_buf[fb->fb_used] = 0;  /* terminate it for convenience */
730   //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
731
732   /* if we need header, check if we've gotten it. if so, do
733      modifications and continue. if not, check if we've read the
734      maximum, and if so, fail */
735   if (si->si_needsHeaderSince) {
736     int whichService;
737     SliceInfo *slice;
738
739 #define STATUS_REQ "GET /codemux/status.txt"
740     if (strncasecmp(fb->fb_buf, STATUS_REQ, sizeof(STATUS_REQ)-1) == 0) {
741       DumpStatus(fd);
742       CloseSock(fd);
743       return;
744     }
745
746     //    printf("trying to find service\n");
747     if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
748       return;
749     //    printf("found service %d\n", whichService);
750     slice = ServiceToSlice(whichService);
751
752     /* no service can have more than some absolute max number of
753        connections. Also, when we're too busy, start enforcing
754        fairness across the servers */
755     if (slice->si_numConns > SERVICE_MAX ||
756         (numTotalSliceConns > FAIRNESS_CUTOFF && 
757          slice->si_numConns > MAX_CONNS/numActiveSlices)) {
758       write(fd, err503TooBusy, strlen(err503TooBusy));
759       TRACE("CloseSock(): fd=%d too busy\n", fd);
760       CloseSock(fd);
761       return;
762     }
763
764     if (slice->si_xid > 0) {
765       static int first = 1;
766       setsockopt(fd, SOL_SOCKET, SO_SETXID, 
767                  &slice->si_xid, sizeof(slice->si_xid));
768       if (first) {
769         /* just to log it for once */
770         fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
771                 slice->si_xid, slice->si_sliceName);
772         first = 0;
773       }
774     }
775
776     si->si_needsHeaderSince = 0;
777     numNeedingHeaders--;
778     if (StartConnect(fd, whichService) != SUCCESS) {
779       write(fd, err503Unavailable, strlen(err503Unavailable));
780       TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
781       CloseSock(fd);
782       return;
783     }
784     return;
785   }
786
787   /* write anything possible */
788   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
789     /* assume the worst and close */
790     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
791           fd, errno, strerror(errno));
792     CloseSock(fd);
793     if (si->si_peerFd >=0) {
794       CloseSock(si->si_peerFd);
795       si->si_peerFd = -1;
796     }
797   }
798 }
799 /*-----------------------------------------------------------------*/
800 static void
801 SocketReadyToWrite(int fd)
802 {
803   SockInfo *si = &sockInfo[fd];
804
805   /* unblock it and read what it has */
806   si->si_blocked = FALSE;
807   ClearFd(fd, &masterWriteSet);
808   SetFd(fd, &masterReadSet);
809   
810   /* enable reading on peer just in case it was off */
811   if (si->si_peerFd >= 0)
812     SetFd(si->si_peerFd, &masterReadSet);
813     
814   /* if we have data, write it */
815   if (WriteAvailData(fd) != SUCCESS) {
816    /* assume the worst and close */
817     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
818           fd, errno, strerror(errno));
819     CloseSock(fd);
820     if (si->si_peerFd >= 0) {
821       CloseSock(si->si_peerFd);
822       si->si_peerFd = -1;
823     }
824     return;
825   }
826
827   /* if peer is closed and we're done writing, we should close */
828   if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
829     CloseSock(fd);
830   }
831 }
832 /*-----------------------------------------------------------------*/
833 static void
834 CloseReqlessConns(void)
835 {
836   static int lastSweep;
837   int maxAge;
838   int i;
839
840   if (lastSweep == now)
841     return;
842   lastSweep = now;
843
844   if (numTotalSliceConns + numNeedingHeaders > MAX_CONNS ||
845       numNeedingHeaders > TARG_SETSIZE/20) {
846     /* second condition is probably an attack - close aggressively */
847     maxAge = 5;
848   }
849   else if (numTotalSliceConns + numNeedingHeaders > FAIRNESS_CUTOFF ||
850            numNeedingHeaders > TARG_SETSIZE/40) {
851     /* sweep a little aggressively */
852     maxAge = 10;
853   }
854   else if (numNeedingHeaders > TARG_SETSIZE/80) {
855     /* just sweep to close strays */
856     maxAge = 30;
857   }
858   else {
859     /* too little gained - not worth sweeping */
860     return;
861   }
862
863   /* if it's too old, close it */
864   for (i = 0; i < highestSetFd+1; i++) {
865     if (sockInfo[i].si_needsHeaderSince &&
866         (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
867       CloseSock(i);
868   }
869 }
870 /*-----------------------------------------------------------------*/
871 static void
872 MainLoop(int lisSock)
873 {
874   int i;
875   OurFDSet tempReadSet, tempWriteSet;
876   int res;
877   int lastConfCheck = 0;
878
879   signal(SIGPIPE, SIG_IGN);
880
881   while (1) {
882     int newSock;
883     int ceiling;
884     struct timeval timeout;
885
886     now = time(NULL);
887
888     if (now - lastConfCheck > 300) {
889       ReadConfFile();
890       GetSliceXids();           /* always call - in case new slices created */
891       lastConfCheck = now;
892     }
893
894     /* see if there's any activity */
895     tempReadSet = masterReadSet;
896     tempWriteSet = masterWriteSet;
897
898     /* trim it down if needed */
899     while (highestSetFd > 1 &&
900            (!FD_ISSET(highestSetFd, &tempReadSet)) &&
901            (!FD_ISSET(highestSetFd, &tempWriteSet)))
902       highestSetFd--;
903     timeout.tv_sec = 1;
904     timeout.tv_usec = 0;
905     res = select(highestSetFd+1, (fd_set *) &tempReadSet, 
906                  (fd_set *) &tempWriteSet, NULL, &timeout);
907     if (res < 0 && errno != EINTR) {
908       perror("select");
909       exit(-1);
910     }
911
912     now = time(NULL);
913
914     /* clear the bit for listen socket to avoid confusion */
915     ClearFd(lisSock, &tempReadSet);
916     
917     ceiling = highestSetFd+1;   /* copy it, since it changes during loop */
918     /* pass data back and forth as needed */
919     for (i = 0; i < ceiling; i++) {
920       if (FD_ISSET(i, &tempWriteSet))
921         SocketReadyToWrite(i);
922     }
923     for (i = 0; i < ceiling; i++) {
924       if (FD_ISSET(i, &tempReadSet))
925         SocketReadyToRead(i);
926     }
927
928     /* see if we need to close conns w/o requests */
929     CloseReqlessConns();
930     
931     /* do all closes */
932     ReallyCloseSocks();
933
934     /* try accepting new connections */
935     do {
936       struct sockaddr_in addr;
937       socklen_t lenAddr = sizeof(addr);
938       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
939                             &lenAddr)) >= 0) {
940         /* make socket non-blocking */
941         if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
942           close(newSock);
943           continue;
944         }
945         memset(&sockInfo[newSock], 0, sizeof(SockInfo));
946         sockInfo[newSock].si_needsHeaderSince = now;
947         numNeedingHeaders++;
948         sockInfo[newSock].si_peerFd = -1;
949         sockInfo[newSock].si_cliAddr = addr.sin_addr;
950         sockInfo[newSock].si_whichService = -1;
951         SetFd(newSock, &masterReadSet);
952       }
953     } while (newSock >= 0);
954   }
955 }
956 /*-----------------------------------------------------------------*/
957 static int 
958 InitDaemon(void)
959 {
960   pid_t pid;
961   FILE *pidfile;
962   
963   pidfile = fopen(PIDFILE, "w");
964   if (pidfile == NULL) {
965     fprintf(stderr, "%s creation failed\n", PIDFILE);
966     return(-1);
967   }
968
969   if ((pid = fork()) < 0) {
970     fclose(pidfile);
971     return(-1);
972   }
973   else if (pid != 0) {
974     /* i'm the parent, writing down the child pid  */
975     fprintf(pidfile, "%u\n", pid);
976     fclose(pidfile);
977     exit(0);
978   }
979
980   /* close the pid file */
981   fclose(pidfile);
982
983   /* routines for any daemon process
984      1. create a new session 
985      2. change directory to the root
986      3. change the file creation permission 
987   */
988   setsid();
989   chdir("/");
990   umask(0);
991
992   return(0);
993 }
994 /*-----------------------------------------------------------------*/
995 static int
996 OpenLogFile(void)
997 {
998   static const char* logfile = "/var/log/codemux.log";
999   static const char* oldlogfile = "/var/log/codemux.log.old";
1000   int logfd;
1001
1002   /* if the previous log file exists,
1003      rename it to the oldlogfile */
1004   if (access(logfile, F_OK) == 0) {
1005     if (rename(logfile, oldlogfile) < 0) {
1006       fprintf(stderr, "cannot rotate the logfile err=%s\n",
1007               strerror(errno));
1008       exit(-1);
1009     }
1010   }
1011
1012   logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
1013   if (logfd < 0) {
1014     fprintf(stderr, "cannot open the logfile err=%s\n",
1015             strerror(errno));
1016     exit(-1);
1017   }
1018
1019   /* duplicate logfile to stderr */
1020   if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
1021     fprintf(stderr, "cannot open the logfile err=%s\n",
1022             strerror(errno));
1023     exit(-1);
1024   }
1025   
1026   /* set the close-on-exec flag */
1027   if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
1028     fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
1029             strerror(errno));
1030     exit(-1);
1031   }
1032
1033   return logfd;
1034 }
1035 /*-----------------------------------------------------------------*/
1036 int
1037 main(int argc, char *argv[])
1038 {
1039   int lisSock;
1040   int logFd;
1041
1042   /* do the daemon stuff */
1043   if (argc <= 1 || strcmp(argv[1], "-d") != 0) {
1044     if (InitDaemon() < 0) {
1045       fprintf(stderr, "codemux daemon_init() failed\n");
1046       exit(-1);
1047     }
1048   }
1049
1050   /* create the accept socket */
1051   if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
1052     fprintf(stderr, "failed creating accept socket\n");
1053     exit(-1);
1054   }
1055   SetFd(lisSock, &masterReadSet);
1056
1057   /* open the log file */
1058   logFd = OpenLogFile();
1059
1060
1061   /* write down the version */
1062   fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
1063
1064   while (1) {
1065     numForks++;
1066     if (fork()) {
1067       /* this is the parent - just wait */
1068       while (wait3(NULL, 0, NULL) < 1)
1069         ;                       /* just keep waiting for a real pid */
1070     }
1071     else {
1072       /* child process */
1073       MainLoop(lisSock);
1074       exit(-1);
1075     }
1076   }
1077 }
1078 /*-----------------------------------------------------------------*/