7d55e0d21698d07ab83e4eea6597daebcff1200c
[codemux.git] / codemux.c
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <sys/stat.h>
4 #include <sys/wait.h>
5 #include <netinet/in.h>
6 #include <arpa/inet.h>
7 #include <ctype.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <signal.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <time.h>
14 #include <unistd.h>
15 #include <string.h>
16 #include "codemuxlib.h"
17 #include "debug.h"
18
19 #ifdef DEBUG
20 HANDLE hdebugLog;
21 int defaultTraceSync;
22 #endif
23
24 #define CONF_FILE "/etc/codemux/codemux.conf"
25 #define DEMUX_PORT 80
26 #define PIDFILE "/var/run/codemux.pid"
27 #define TARG_SETSIZE 4096
28
29 /* set aside some small number of fds for us, allow the rest for
30    connections */
31 #define MAX_CONNS ((TARG_SETSIZE-20)/2)
32
33 /* no single service can take more than half the connections */
34 #define SERVICE_MAX (MAX_CONNS/2)
35
36 /* how many total connections before we get concerned about fairness
37    among them */
38 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
39
40 /* codemux version, from Makefile, or specfile */
41 #define CODEMUX_VERSION RPM_VERSION
42
43 typedef struct FlowBuf {
44   int fb_refs;                  /* num refs */
45   char *fb_buf;                 /* actual buffer */
46   int fb_used;                  /* bytes used in buffer */
47 } FlowBuf;
48 #define FB_SIZE 3800            /* max usable size */
49 #define FB_ALLOCSIZE 4000       /* extra to include IP address */
50
51 typedef struct SockInfo {
52   int si_peerFd;                /* fd of peer */
53   struct in_addr si_cliAddr;    /* address of client */
54   int si_blocked;               /* are we blocked? */
55   int si_needsHeaderSince;      /* since when are we waiting for a header */
56   int si_whichService;          /* index of service */
57   FlowBuf *si_readBuf;          /* read data into this buffer */
58   FlowBuf *si_writeBuf;         /* drain this buffer for writing */
59 } SockInfo;
60
61 static SockInfo sockInfo[TARG_SETSIZE]; /* fd number of peer socket */
62
63 typedef struct ServiceSig {
64   char *ss_host;                /* suffix in host */
65   char *ss_slice;
66   short ss_port;
67   int ss_slicePos;              /* position in slices array */
68 } ServiceSig;
69
70 static ServiceSig *serviceSig;
71 static int numServices;
72 static int confFileReadTime;
73 static int now;
74
75 typedef struct SliceInfo {
76   char *si_sliceName;
77   int si_inUse;                 /* do any services refer to this? */
78   int si_numConns;
79   int si_xid;
80 } SliceInfo;
81
82 static SliceInfo *slices;
83 static int numSlices;
84 static int numActiveSlices;
85 static int numTotalSliceConns;
86 static int anySliceXidsNeeded;
87
88 typedef struct OurFDSet {
89   long __fds_bits[TARG_SETSIZE/32];
90 } OurFDSet;
91 static OurFDSet masterReadSet, masterWriteSet;
92 static int highestSetFd;
93 static int numNeedingHeaders;   /* how many conns waiting on headers? */
94
95 static int numForks;
96
97 /* PLC netflow domain name like netflow.planet-lab.org */
98 static char* domainNamePLCNetflow = NULL;
99
100 #ifndef SO_SETXID
101 #define SO_SETXID SO_PEERCRED
102 #endif
103 /*-----------------------------------------------------------------*/
104 static SliceInfo *
105 ServiceToSlice(int whichService)
106 {
107   if (whichService < 0)
108     return(NULL);
109   return(&slices[serviceSig[whichService].ss_slicePos]);
110 }
111 /*-----------------------------------------------------------------*/
112 static void
113 DumpStatus(int fd)
114 {
115   char buf[65535];
116   char *start = buf;
117   int i;
118   int len;
119
120   sprintf(start, 
121           "CoDemux version %s\n"
122           "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
123           "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
124           CODEMUX_VERSION,
125           numForks, numActiveSlices, numTotalSliceConns,
126           numNeedingHeaders, anySliceXidsNeeded);
127   start += strlen(start);
128
129   for (i = 0; i < numSlices; i++) {
130     SliceInfo *si = &slices[i];
131     sprintf(start, "Slice %d: %s xid %d, %d conns, inUse %d\n", 
132             i, si->si_sliceName, si->si_xid, si->si_numConns,
133             si->si_inUse);
134     start += strlen(start);
135   }
136
137   for (i = 0; i < numServices; i++) {
138     ServiceSig *ss = &serviceSig[i];
139     sprintf(start, "Service %d: %s %s port %d, slice# %d\n", i, ss->ss_host,
140             ss->ss_slice, (int) ss->ss_port, ss->ss_slicePos);
141     start += strlen(start);
142   }
143
144   len = start - buf;
145   write(fd, buf, len);
146 }
147 /*-----------------------------------------------------------------*/
148 static void
149 GetSliceXids(void)
150 {
151   /* walks through /etc/passwd, and gets the uid for every slice we
152      have */
153   FILE *f;
154   char *line;
155   int i;
156
157   if (!anySliceXidsNeeded)
158     return;
159
160   for (i = 0; i < numSlices; i++) {
161     SliceInfo *si = &slices[i];
162     si->si_inUse = 0;
163   }
164   for (i = 0; i < numServices; i++) {
165     SliceInfo *si = ServiceToSlice(i);
166     if (si != NULL)
167       si->si_inUse++;
168   }  
169
170   if ((f = fopen("/etc/passwd", "r")) == NULL)
171     return;
172
173   while ((line = GetNextLine(f)) != NULL) {
174     char *temp;
175     int xid;
176
177     if ((temp = strchr(line, ':')) == NULL)
178       goto next_line;                   /* weird line */
179     *temp = '\0';               /* terminate slice name */
180     temp++;
181     if ((temp = strchr(temp+1, ':')) == NULL)
182       goto next_line;   /* weird line */
183     if ((xid = atoi(temp+1)) < 1)
184       goto next_line;   /* weird xid */
185     
186     /* we've got a slice name and xid, let's try to match */
187     for (i = 0; i < numSlices; i++) {
188       if (slices[i].si_xid == 0 &&
189           strcasecmp(slices[i].si_sliceName, line) == 0) {
190         slices[i].si_xid = xid;
191         break;
192       }
193     }
194   next_line:
195     if (line)
196       xfree(line);
197   }
198
199   /* assume service 0 is the root service, and don't check it since
200      it'll have xid zero */
201   anySliceXidsNeeded = FALSE;
202   for (i = 1; i < numSlices; i++) {
203     if (slices[i].si_xid == 0 && slices[i].si_inUse > 0) {
204       anySliceXidsNeeded = TRUE;
205       break;
206     }
207   }
208
209   fclose(f);
210 }
211 /*-----------------------------------------------------------------*/
212 static void
213 SliceConnsInc(int whichService)
214 {
215   SliceInfo *si = ServiceToSlice(whichService);
216
217   if (si == NULL)
218     return;
219   numTotalSliceConns++;
220   si->si_numConns++;
221   if (si->si_numConns == 1)
222     numActiveSlices++;
223 }
224 /*-----------------------------------------------------------------*/
225 static void
226 SliceConnsDec(int whichService)
227 {
228   SliceInfo *si = ServiceToSlice(whichService);
229
230   if (si == NULL)
231     return;
232   numTotalSliceConns--;
233   si->si_numConns--;
234   if (si->si_numConns == 0)
235     numActiveSlices--;
236 }
237 /*-----------------------------------------------------------------*/
238 static int
239 WhichSlicePos(char *slice)
240 {
241   /* adds the new slice if necessary, returns the index into slice
242      array. Never change the ordering of existing slices */
243   int i;
244   static int numSlicesAlloc;
245
246   for (i = 0; i < numSlices; i++) {
247     if (strcasecmp(slice, slices[i].si_sliceName) == 0)
248       return(i);
249   }
250
251   if (numSlices >= numSlicesAlloc) {
252     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
253     slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
254   }
255
256   memset(&slices[numSlices], 0, sizeof(SliceInfo));
257   slices[numSlices].si_sliceName = xstrdup(slice);
258   numSlices++;
259   return(numSlices-1);
260 }
261 /*-----------------------------------------------------------------*/
262 static void
263 ReadConfFile(void)
264 {
265   int numAlloc = 0;
266   int num = 0;
267   ServiceSig *servs = NULL;
268   FILE *f;
269   char *line = NULL;
270   struct stat statBuf;
271   int i;
272
273   if (stat(CONF_FILE, &statBuf) != 0) {
274     fprintf(stderr, "failed stat on codemux.conf\n");
275     if (numServices)
276       return;
277     exit(-1);
278   }
279   if (statBuf.st_mtime == confFileReadTime)
280     return;
281
282   if ((f = fopen(CONF_FILE, "r")) == NULL) {
283     fprintf(stderr, "failed reading codemux.conf\n");
284     if (numServices)
285       return;
286     exit(-1);
287   }
288
289   /* conf file entries look like
290      coblitz.codeen.org princeton_coblitz 3125
291   */
292
293   while (1) {
294     ServiceSig serv;
295     int port;
296     if (line != NULL)
297       xfree(line);
298     
299     if ((line = GetNextLine(f)) == NULL)
300       break;
301
302     memset(&serv, 0, sizeof(serv));
303     if (WordCount(line) < 3) {
304       fprintf(stderr, "bad line: %s\n", line);
305       continue;
306     }
307     serv.ss_port = port = atoi(GetField(line, 2));
308     if (port < 1 || port > 65535 || port == DEMUX_PORT) {
309       fprintf(stderr, "bad port: %s\n", line);
310       continue;
311     }
312
313     serv.ss_host = GetWord(line, 0);
314     serv.ss_slice = GetWord(line, 1);
315
316     if (num == 0) {
317       /* the first row must be an entry for apache */
318       if (strcmp(serv.ss_host, "*") != 0 ||
319           strcmp(serv.ss_slice, "root") != 0) {
320         fprintf(stderr, "first row has to be for webserver\n");
321         exit(-1);
322       }
323       /* see if there's PLC netflow's domain name */
324       if (domainNamePLCNetflow != NULL) {
325         xfree(domainNamePLCNetflow);
326         domainNamePLCNetflow = NULL;
327       }
328       domainNamePLCNetflow = GetWord(line, 3);
329     }
330     if (num >= numAlloc) {
331       numAlloc = MAX(numAlloc * 2, 8);
332       servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
333     }
334     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
335     if (slices[serv.ss_slicePos].si_inUse == 0 &&
336         slices[serv.ss_slicePos].si_xid < 1)
337       anySliceXidsNeeded = TRUE; /* if new/inactive, we need xid */
338     servs[num] = serv;
339     num++;
340   }
341
342   fclose(f);
343
344 #if 0
345   /* Faiyaz asked me to allow a single-entry codemux conf */
346   if (num == 1) {
347     if (numServices == 0) {
348       fprintf(stderr, "nothing found in codemux.conf\n");
349       exit(-1);
350     }
351     return;
352   }
353 #endif
354   if (num < 1) {
355     fprintf(stderr, "no entry found in codemux.conf\n");
356     exit(-1);
357   }
358
359   for (i = 0; i < numServices; i++) {
360     xfree(serviceSig[i].ss_host);
361     xfree(serviceSig[i].ss_slice);
362   }
363   xfree(serviceSig);
364   serviceSig = servs;
365   numServices = num;
366   confFileReadTime = statBuf.st_mtime;
367 }
368 /*-----------------------------------------------------------------*/
369 static char *err400BadRequest =
370 "HTTP/1.0 400 Bad Request\r\n"
371 "Content-Type: text/html\r\n"
372 "\r\n"
373 "You are trying to access a PlanetLab node, and your\n"
374 "request header exceeded the allowable size. Please\n"
375 "try again if you believe this error is temporary.\n";
376 /*-----------------------------------------------------------------*/
377 static char *err503Unavailable =
378 "HTTP/1.0 503 Service Unavailable\r\n"
379 "Content-Type: text/html\r\n"
380 "\r\n"
381 "You are trying to access a PlanetLab node, but the service\n"
382 "seems to be unavailable at the moment. Please try again.\n";
383 /*-----------------------------------------------------------------*/
384 static char *err503TooBusy =
385 "HTTP/1.0 503 Service Unavailable\r\n"
386 "Content-Type: text/html\r\n"
387 "\r\n"
388 "You are trying to access a PlanetLab node, but the service\n"
389 "seems to be overloaded at the moment. Please try again.\n";
390 /*-----------------------------------------------------------------*/
391 static void
392 SetFd(int fd, OurFDSet *set)
393 {
394   if (highestSetFd < fd)
395     highestSetFd = fd;
396   FD_SET(fd, set);
397 }
398 /*-----------------------------------------------------------------*/
399 static void
400 ClearFd(int fd, OurFDSet *set)
401 {
402   FD_CLR(fd, set);
403 }
404 /*-----------------------------------------------------------------*/
405 static int
406 RemoveHeader(char *lower, char *real, int totalSize, char *header)
407 {
408   /* returns number of characters removed */
409   char h2[256];
410   int start, end, len;
411   char *temp, *conn;
412
413   sprintf(h2, "\n%s", header);
414
415   if ((conn = strstr(lower, h2)) == NULL)
416     return(0);
417
418   conn++;
419   /* determine how many characters to remove */
420   if ((temp = strchr(conn, '\n')) != NULL)
421     len = (temp - conn) + 1;
422   else
423     len = strlen(conn) + 1;
424   start = conn - lower;
425   end = start + len;
426   memmove(&real[start], &real[end], totalSize - end);
427   memmove(&lower[start], &lower[end], totalSize - end);
428
429   return(len);
430 }
431 /*-----------------------------------------------------------------*/
432 static int
433 InsertHeader(char *buf, int totalSize, char *header)
434 {
435   /* returns number of bytes inserted */
436   
437   char h2[256];
438   char *temp;
439   int len;
440   
441   sprintf(h2, "%s\r\n", header);
442   len = strlen(h2);
443
444   /* if we don't encounter a \n, it means that we have only a single
445      line, and we'd converted the \n to a \0 */
446   if ((temp = strchr(buf, '\n')) == NULL)
447     temp = strchr(buf, '\0');
448   temp++;
449   
450   memmove(temp + len, temp, totalSize - (temp - buf));
451   memcpy(temp, h2, len);
452   
453   return(len);
454 }
455 /*-----------------------------------------------------------------*/
456 static int
457 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
458 {
459   char *end;
460   char *lowerBuf;
461   char *hostVal;
462   char *buf = fb->fb_buf;
463   char orig[256];
464 #if 0
465   char *url;
466   int i;
467   int len;
468 #endif
469     
470   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
471     return(FAILURE);
472
473   /* insert client info after first line */
474   sprintf(orig, "X-CoDemux-Client: %s", inet_ntoa(addr));
475   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
476     
477   /* get just the header, so we can work on it */
478   LOCAL_STR_DUP_LOWER(lowerBuf, buf);
479   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
480     end = strstr(lowerBuf, "\n\n");
481   *end = '\0';
482   
483   /* remove any existing connection, keep-alive headers, add ours */
484   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "keep-alive:");
485   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "connection:");
486   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, "Connection: close");
487   InsertHeader(lowerBuf, fb->fb_used + 1, "connection: close");
488
489   /* isolate host, see if it matches */
490   if ((hostVal = strstr(lowerBuf, "\nhost:")) != NULL) {
491     int i;
492     hostVal += strlen("\nhost:");
493     if ((end = strchr(hostVal, '\n')) != NULL)
494       *end = '\0';
495     if ((end = strchr(hostVal, ':')) != NULL)
496       *end = '\0';
497     while (isspace(*hostVal))
498       hostVal++;
499     if (strlen(hostVal) > 0) {
500       hostVal = GetWord(hostVal, 0);
501       for (i = 1; i < numServices; i++) {
502         if (serviceSig[i].ss_host != NULL &&
503             DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
504           *whichService = i;
505           free(hostVal);
506           /* printf("%s", buf); */
507           return(SUCCESS);
508         }
509       }
510       free(hostVal);
511     }
512   }
513
514 #if 0
515   /* see if URL prefix matches */
516   if ((end = strchr(lowerBuf, '\n')) != NULL)
517     *end = 0;
518   if ((url = GetField(lowerBuf, 1)) == NULL ||
519       url[0] != '/') {
520     /* bad request - let apache handle it ? */
521     *whichService = 0;
522     return(SUCCESS);
523   }
524   url++;                        /* skip the leading slash */
525   for (i = 1; i < numServices; i++) {
526     if (serviceSig[i].ss_prefix != NULL &&
527         (len = strlen(serviceSig[i].ss_prefix)) > 0 &&
528         strncmp(url, serviceSig[i].ss_prefix, len) == 0 &&
529         (url[len] == ' ' || url[len] == '/')) {
530       int startPos = url - lowerBuf;
531       int stripLen = len + ((url[len] == '/') ? 1 : 0);
532       /* strip out prefix */
533       fb->fb_used -= stripLen;
534       memmove(&buf[startPos], &buf[startPos+stripLen], 
535               fb->fb_used + 1 - startPos);
536       /* printf("%s", buf); */
537       *whichService = i;
538       return(SUCCESS);
539     }
540   }
541 #endif
542
543   /* default to first service */
544   *whichService = 0;
545   return(SUCCESS);
546 }
547 /*-----------------------------------------------------------------*/
548 static int
549 StartConnect(int origFD, int whichService)
550 {
551   int sock;
552   struct sockaddr_in dest;
553   SockInfo *si;
554
555   /* create socket */
556   if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
557     return(FAILURE);
558   }
559   
560   /* make socket non-blocking */
561   if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
562     close(sock);
563     return(FAILURE);
564   }
565   
566   /* set addr structure */
567   memset(&dest, 0, sizeof(dest));
568   dest.sin_family = AF_INET;
569   dest.sin_port = htons(serviceSig[whichService].ss_port);
570   dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
571   
572   /* start connection process - we should be told that it's in
573      progress */
574   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
575       errno != EINPROGRESS) {
576     close(sock);
577     return(FAILURE);
578   }
579
580   SetFd(sock, &masterWriteSet); /* determine when connect finishes */
581   sockInfo[origFD].si_peerFd = sock;
582   si = &sockInfo[sock];
583   memset(si, 0, sizeof(SockInfo));
584   si->si_peerFd = origFD;
585   si->si_blocked = TRUE;        /* still connecting */
586   si->si_whichService = whichService;
587   si->si_writeBuf = sockInfo[origFD].si_readBuf;
588   sockInfo[origFD].si_readBuf->fb_refs++;
589   if (whichService >= 0)
590     SliceConnsInc(whichService);
591
592   return(SUCCESS);
593 }
594 /*-----------------------------------------------------------------*/
595 static int
596 WriteAvailData(int fd)
597 {
598   SockInfo *si = &sockInfo[fd];
599   FlowBuf *fb = si->si_writeBuf;
600   int res;
601
602   /* printf("trying to write fd %d\n", fd); */
603   if (fb->fb_used < 1 || si->si_blocked)
604     return(SUCCESS);
605
606   /* printf("trying to write %d bytes\n", fb->fb_used); */
607   /* write(STDOUT_FILENO, fb->fb_buf, fb->fb_used); */
608   if ((res = write(fd, fb->fb_buf, fb->fb_used)) > 0) {
609     fb->fb_used -= res;
610     if (fb->fb_used > 0) {
611       /* couldn't write all - assume blocked */
612       memmove(fb->fb_buf, &fb->fb_buf[res], fb->fb_used);
613       si->si_blocked = TRUE;
614       SetFd(fd, &masterWriteSet);
615     }
616     /* printf("wrote %d\n", res); */
617     return(SUCCESS);
618   }
619
620   /* we might have been full but didn't realize it */
621   if (res == -1 && errno == EAGAIN) {
622     si->si_blocked = TRUE;
623     SetFd(fd, &masterWriteSet);
624     return(SUCCESS);
625   }
626
627   /* otherwise, assume the worst */
628   return(FAILURE);
629 }
630 /*-----------------------------------------------------------------*/
631 static OurFDSet socksToCloseVec;
632 static int numSocksToClose;
633 static int whichSocksToClose[TARG_SETSIZE];
634 /*-----------------------------------------------------------------*/
635 static void
636 CloseSock(int fd)
637 {
638   if (FD_ISSET(fd, &socksToCloseVec))
639     return;
640   SetFd(fd, &socksToCloseVec);
641   whichSocksToClose[numSocksToClose] = fd;
642   numSocksToClose++;
643 }
644 /*-----------------------------------------------------------------*/
645 static void
646 DecBuf(FlowBuf *buf)
647 {
648   if (buf == NULL)
649     return;
650   buf->fb_refs--;
651   if (buf->fb_refs == 0) {
652     free(buf->fb_buf);
653     free(buf);
654   }
655 }
656 /*-----------------------------------------------------------------*/
657 static void
658 ReallyCloseSocks(void)
659 {
660   int i;
661
662   memset(&socksToCloseVec, 0, sizeof(socksToCloseVec));
663
664   for (i = 0; i < numSocksToClose; i++) {
665     int fd = whichSocksToClose[i];
666     close(fd);
667     DecBuf(sockInfo[fd].si_readBuf);
668     DecBuf(sockInfo[fd].si_writeBuf);
669     ClearFd(fd, &masterReadSet);
670     ClearFd(fd, &masterWriteSet);
671     if (sockInfo[fd].si_needsHeaderSince) {
672       sockInfo[fd].si_needsHeaderSince = 0;
673       numNeedingHeaders--;
674     }
675     if (sockInfo[fd].si_whichService >= 0) {
676       SliceConnsDec(sockInfo[fd].si_whichService);
677       sockInfo[fd].si_whichService = -1;
678     }
679     /* KyoungSoo*/
680     if (sockInfo[fd].si_peerFd >= 0) {
681       sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
682     }
683   }
684   numSocksToClose = 0;
685 }
686 /*-----------------------------------------------------------------*/
687 static void
688 SocketReadyToRead(int fd)
689 {
690   SockInfo *si = &sockInfo[fd];
691   int spaceLeft;
692   FlowBuf *fb;
693   int res;
694
695   /* if peer is closed, close ourselves */
696   if (si->si_peerFd < 0 && (!si->si_needsHeaderSince)) {
697     CloseSock(fd);
698     return;
699   }
700
701   if ((fb = si->si_readBuf) == NULL) {
702     fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
703     fb->fb_refs = 1;
704     if (si->si_peerFd >= 0) {
705       sockInfo[si->si_peerFd].si_writeBuf = fb;
706       fb->fb_refs = 2;
707     }
708   }
709
710   if (fb->fb_buf == NULL)
711     fb->fb_buf = xmalloc(FB_ALLOCSIZE);
712
713   /* determine read buffer size - if 0, then block reads and return */
714   if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
715     if (si->si_needsHeaderSince) {
716       write(fd, err400BadRequest, strlen(err400BadRequest));
717       CloseSock(fd);
718       return;
719     }
720     else {
721       ClearFd(fd, &masterReadSet);
722       return;
723     }
724   } 
725   
726   /* read as much as allowed, and is available */
727   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
728     CloseSock(fd);
729     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
730       CloseSock(si->si_peerFd);
731       si->si_peerFd = -1;
732     }
733     return;
734   }
735   if (res == -1) {
736     if (errno == EAGAIN)
737       return;
738     TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
739     CloseSock(fd);
740     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
741       CloseSock(si->si_peerFd);
742       si->si_peerFd = -1;
743     }
744     return;
745   }
746   fb->fb_used += res;
747   fb->fb_buf[fb->fb_used] = 0;  /* terminate it for convenience */
748   //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
749
750   /* if we need header, check if we've gotten it. if so, do
751      modifications and continue. if not, check if we've read the
752      maximum, and if so, fail */
753   if (si->si_needsHeaderSince) {
754     int whichService;
755     SliceInfo *slice;
756
757 #define STATUS_REQ "GET /codemux/status.txt"
758     if (strncasecmp(fb->fb_buf, STATUS_REQ, sizeof(STATUS_REQ)-1) == 0) {
759       DumpStatus(fd);
760       CloseSock(fd);
761       return;
762     }
763
764     //    printf("trying to find service\n");
765     if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
766       return;
767     //    printf("found service %d\n", whichService);
768     slice = ServiceToSlice(whichService);
769
770     /* if it needs to be redirected to PLC, let it be handled here */
771     if (whichService == 0 && domainNamePLCNetflow != NULL &&
772         strcmp(slice->si_sliceName, "root") == 0) {
773       char msg[1024];
774       int len;
775       static const char* resp302 = 
776         "HTTP/1.0 302 Found\r\n"
777         "Location: http://%s\r\n"
778         "Cache-Control: no-cache, no-store\r\n"
779         "Content-type: text/html\r\n"
780         "Connection: close\r\n"
781         "\r\n"
782         "Your request is being redirected to PLC Netflow http://%s\n";
783       len = snprintf(msg, sizeof(msg), resp302, 
784                      domainNamePLCNetflow, domainNamePLCNetflow);
785       write(fd, msg, len);
786       CloseSock(fd);
787       return;
788     }
789     /* no service can have more than some absolute max number of
790        connections. Also, when we're too busy, start enforcing
791        fairness across the servers */
792     if (slice->si_numConns > SERVICE_MAX ||
793         (numTotalSliceConns > FAIRNESS_CUTOFF && 
794          slice->si_numConns > MAX_CONNS/numActiveSlices)) {
795       write(fd, err503TooBusy, strlen(err503TooBusy));
796       TRACE("CloseSock(): fd=%d too busy\n", fd);
797       CloseSock(fd);
798       return;
799     }
800
801     if (slice->si_xid > 0) {
802       static int first = 1;
803       setsockopt(fd, SOL_SOCKET, SO_SETXID, 
804                  &slice->si_xid, sizeof(slice->si_xid));
805       if (first) {
806         /* just to log it for once */
807         fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
808                 slice->si_xid, slice->si_sliceName);
809         first = 0;
810       }
811     }
812
813     si->si_needsHeaderSince = 0;
814     numNeedingHeaders--;
815     if (StartConnect(fd, whichService) != SUCCESS) {
816       write(fd, err503Unavailable, strlen(err503Unavailable));
817       TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
818       CloseSock(fd);
819       return;
820     }
821     return;
822   }
823
824   /* write anything possible */
825   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
826     /* assume the worst and close */
827     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
828           fd, errno, strerror(errno));
829     CloseSock(fd);
830     if (si->si_peerFd >=0) {
831       CloseSock(si->si_peerFd);
832       si->si_peerFd = -1;
833     }
834   }
835 }
836 /*-----------------------------------------------------------------*/
837 static void
838 SocketReadyToWrite(int fd)
839 {
840   SockInfo *si = &sockInfo[fd];
841
842   /* unblock it and read what it has */
843   si->si_blocked = FALSE;
844   ClearFd(fd, &masterWriteSet);
845   SetFd(fd, &masterReadSet);
846   
847   /* enable reading on peer just in case it was off */
848   if (si->si_peerFd >= 0)
849     SetFd(si->si_peerFd, &masterReadSet);
850     
851   /* if we have data, write it */
852   if (WriteAvailData(fd) != SUCCESS) {
853    /* assume the worst and close */
854     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
855           fd, errno, strerror(errno));
856     CloseSock(fd);
857     if (si->si_peerFd >= 0) {
858       CloseSock(si->si_peerFd);
859       si->si_peerFd = -1;
860     }
861     return;
862   }
863
864   /* if peer is closed and we're done writing, we should close */
865   if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
866     CloseSock(fd);
867   }
868 }
869 /*-----------------------------------------------------------------*/
870 static void
871 CloseReqlessConns(void)
872 {
873   static int lastSweep;
874   int maxAge;
875   int i;
876
877   if (lastSweep == now)
878     return;
879   lastSweep = now;
880
881   if (numTotalSliceConns + numNeedingHeaders > MAX_CONNS ||
882       numNeedingHeaders > TARG_SETSIZE/20) {
883     /* second condition is probably an attack - close aggressively */
884     maxAge = 5;
885   }
886   else if (numTotalSliceConns + numNeedingHeaders > FAIRNESS_CUTOFF ||
887            numNeedingHeaders > TARG_SETSIZE/40) {
888     /* sweep a little aggressively */
889     maxAge = 10;
890   }
891   else if (numNeedingHeaders > TARG_SETSIZE/80) {
892     /* just sweep to close strays */
893     maxAge = 30;
894   }
895   else {
896     /* too little gained - not worth sweeping */
897     return;
898   }
899
900   /* if it's too old, close it */
901   for (i = 0; i < highestSetFd+1; i++) {
902     if (sockInfo[i].si_needsHeaderSince &&
903         (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
904       CloseSock(i);
905   }
906 }
907 /*-----------------------------------------------------------------*/
908 static void
909 MainLoop(int lisSock)
910 {
911   int i;
912   OurFDSet tempReadSet, tempWriteSet;
913   int res;
914   int lastConfCheck = 0;
915
916   signal(SIGPIPE, SIG_IGN);
917
918   while (1) {
919     int newSock;
920     int ceiling;
921     struct timeval timeout;
922
923     now = time(NULL);
924
925     if (now - lastConfCheck > 300) {
926       ReadConfFile();
927       GetSliceXids();           /* always call - in case new slices created */
928       lastConfCheck = now;
929     }
930
931     /* see if there's any activity */
932     tempReadSet = masterReadSet;
933     tempWriteSet = masterWriteSet;
934
935     /* trim it down if needed */
936     while (highestSetFd > 1 &&
937            (!FD_ISSET(highestSetFd, &tempReadSet)) &&
938            (!FD_ISSET(highestSetFd, &tempWriteSet)))
939       highestSetFd--;
940     timeout.tv_sec = 1;
941     timeout.tv_usec = 0;
942     res = select(highestSetFd+1, (fd_set *) &tempReadSet, 
943                  (fd_set *) &tempWriteSet, NULL, &timeout);
944     if (res < 0 && errno != EINTR) {
945       perror("select");
946       exit(-1);
947     }
948
949     now = time(NULL);
950
951     /* clear the bit for listen socket to avoid confusion */
952     ClearFd(lisSock, &tempReadSet);
953     
954     ceiling = highestSetFd+1;   /* copy it, since it changes during loop */
955     /* pass data back and forth as needed */
956     for (i = 0; i < ceiling; i++) {
957       if (FD_ISSET(i, &tempWriteSet))
958         SocketReadyToWrite(i);
959     }
960     for (i = 0; i < ceiling; i++) {
961       if (FD_ISSET(i, &tempReadSet))
962         SocketReadyToRead(i);
963     }
964
965     /* see if we need to close conns w/o requests */
966     CloseReqlessConns();
967     
968     /* do all closes */
969     ReallyCloseSocks();
970
971     /* try accepting new connections */
972     do {
973       struct sockaddr_in addr;
974       socklen_t lenAddr = sizeof(addr);
975       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
976                             &lenAddr)) >= 0) {
977         /* make socket non-blocking */
978         if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
979           close(newSock);
980           continue;
981         }
982         memset(&sockInfo[newSock], 0, sizeof(SockInfo));
983         sockInfo[newSock].si_needsHeaderSince = now;
984         numNeedingHeaders++;
985         sockInfo[newSock].si_peerFd = -1;
986         sockInfo[newSock].si_cliAddr = addr.sin_addr;
987         sockInfo[newSock].si_whichService = -1;
988         SetFd(newSock, &masterReadSet);
989       }
990     } while (newSock >= 0);
991   }
992 }
993 /*-----------------------------------------------------------------*/
994 static int 
995 InitDaemon(void)
996 {
997   pid_t pid;
998   FILE *pidfile;
999   
1000   pidfile = fopen(PIDFILE, "w");
1001   if (pidfile == NULL) {
1002     fprintf(stderr, "%s creation failed\n", PIDFILE);
1003     return(-1);
1004   }
1005
1006   if ((pid = fork()) < 0) {
1007     fclose(pidfile);
1008     return(-1);
1009   }
1010   else if (pid != 0) {
1011     /* i'm the parent, writing down the child pid  */
1012     fprintf(pidfile, "%u\n", pid);
1013     fclose(pidfile);
1014     exit(0);
1015   }
1016
1017   /* close the pid file */
1018   fclose(pidfile);
1019
1020   /* routines for any daemon process
1021      1. create a new session 
1022      2. change directory to the root
1023      3. change the file creation permission 
1024   */
1025   setsid();
1026   chdir("/");
1027   umask(0);
1028
1029   return(0);
1030 }
1031 /*-----------------------------------------------------------------*/
1032 static int
1033 OpenLogFile(void)
1034 {
1035   static const char* logfile = "/var/log/codemux.log";
1036   int logfd;
1037
1038   logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
1039   if (logfd < 0) {
1040     fprintf(stderr, "cannot open the logfile err=%s\n",
1041             strerror(errno));
1042     exit(-1);
1043   }
1044
1045   /* duplicate logfile to stderr */
1046   if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
1047     fprintf(stderr, "cannot open the logfile err=%s\n",
1048             strerror(errno));
1049     exit(-1);
1050   }
1051   
1052   /* set the close-on-exec flag */
1053   if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
1054     fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
1055             strerror(errno));
1056     exit(-1);
1057   }
1058
1059   return logfd;
1060 }
1061 /*-----------------------------------------------------------------*/
1062 int
1063 main(int argc, char *argv[])
1064 {
1065   int lisSock;
1066   int logFd;
1067
1068   /* do the daemon stuff */
1069   if (argc <= 1 || strcmp(argv[1], "-d") != 0) {
1070     if (InitDaemon() < 0) {
1071       fprintf(stderr, "codemux daemon_init() failed\n");
1072       exit(-1);
1073     }
1074   }
1075
1076   /* create the accept socket */
1077   if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
1078     fprintf(stderr, "failed creating accept socket\n");
1079     exit(-1);
1080   }
1081   SetFd(lisSock, &masterReadSet);
1082
1083   /* open the log file */
1084   logFd = OpenLogFile();
1085
1086
1087   /* write down the version */
1088   fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
1089
1090   while (1) {
1091     numForks++;
1092     if (fork()) {
1093       /* this is the parent - just wait */
1094       while (wait3(NULL, 0, NULL) < 1)
1095         ;                       /* just keep waiting for a real pid */
1096     }
1097     else {
1098       /* child process */
1099       MainLoop(lisSock);
1100       exit(-1);
1101     }
1102   }
1103 }
1104 /*-----------------------------------------------------------------*/