Setting tag codemux-0.1-15
[codemux.git] / codemux.c
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <sys/stat.h>
4 #include <sys/wait.h>
5 #include <netinet/in.h>
6 #include <arpa/inet.h>
7 #include <ctype.h>
8 #include <errno.h>
9 #include <fcntl.h>
10 #include <signal.h>
11 #include <stdio.h>
12 #include <stdlib.h>
13 #include <time.h>
14 #include <unistd.h>
15 #include <string.h>
16 #include "codemuxlib.h"
17 #include "debug.h"
18
19 #ifdef DEBUG
20 HANDLE hdebugLog;
21 int defaultTraceSync;
22 #endif
23
24 #define CONF_FILE "/etc/codemux/codemux.conf"
25 #define DEMUX_PORT 80
26 #define PIDFILE "/var/run/codemux.pid"
27 #define TARG_SETSIZE 4096
28
29 /* set aside some small number of fds for us, allow the rest for
30    connections */
31 #define MAX_CONNS ((TARG_SETSIZE-20)/2)
32
33 /* no single service can take more than half the connections */
34 #define SERVICE_MAX (MAX_CONNS/2)
35
36 /* how many total connections before we get concerned about fairness
37    among them */
38 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
39
40 /* codemux version, from Makefile, or specfile */
41 #define CODEMUX_VERSION RPM_VERSION
42
43 typedef struct FlowBuf {
44   int fb_refs;                  /* num refs */
45   char *fb_buf;                 /* actual buffer */
46   int fb_used;                  /* bytes used in buffer */
47 } FlowBuf;
48 #define FB_SIZE 3800            /* max usable size */
49 #define FB_ALLOCSIZE 4000       /* extra to include IP address */
50
51 typedef struct SockInfo {
52   int si_peerFd;                /* fd of peer */
53   struct in_addr si_cliAddr;    /* address of client */
54   int si_blocked;               /* are we blocked? */
55   int si_needsHeaderSince;      /* since when are we waiting for a header */
56   int si_whichService;          /* index of service */
57   FlowBuf *si_readBuf;          /* read data into this buffer */
58   FlowBuf *si_writeBuf;         /* drain this buffer for writing */
59 } SockInfo;
60
61 static SockInfo sockInfo[TARG_SETSIZE]; /* fd number of peer socket */
62
63 typedef struct ServiceSig {
64   char *ss_host;                /* suffix in host */
65   char *ss_slice;
66   short ss_port;
67   char *ss_ip;
68   int ss_slicePos;              /* position in slices array */
69 } ServiceSig;
70
71 static ServiceSig *serviceSig;
72 static int numServices;
73 static int confFileReadTime;
74 static int now;
75
76 typedef struct SliceInfo {
77   char *si_sliceName;
78   int si_inUse;                 /* do any services refer to this? */
79   int si_numConns;
80   int si_xid;
81 } SliceInfo;
82
83 static SliceInfo *slices;
84 static int numSlices;
85 static int numActiveSlices;
86 static int numTotalSliceConns;
87 static int anySliceXidsNeeded;
88
89 typedef struct OurFDSet {
90   long __fds_bits[TARG_SETSIZE/32];
91 } OurFDSet;
92 static OurFDSet masterReadSet, masterWriteSet;
93 static int highestSetFd;
94 static int numNeedingHeaders;   /* how many conns waiting on headers? */
95
96 static int numForks;
97
98 /* PLC netflow domain name like netflow.planet-lab.org */
99 static char* domainNamePLCNetflow = NULL;
100
101 #ifndef SO_SETXID
102 #define SO_SETXID SO_PEERCRED
103 #endif
104 /*-----------------------------------------------------------------*/
105 static SliceInfo *
106 ServiceToSlice(int whichService)
107 {
108   if (whichService < 0)
109     return(NULL);
110   return(&slices[serviceSig[whichService].ss_slicePos]);
111 }
112 /*-----------------------------------------------------------------*/
113 static void
114 DumpStatus(int fd)
115 {
116   char buf[65535];
117   char *start = buf;
118   int i;
119   int len;
120
121   sprintf(start, 
122           "CoDemux version %s\n"
123           "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
124           "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
125           CODEMUX_VERSION,
126           numForks, numActiveSlices, numTotalSliceConns,
127           numNeedingHeaders, anySliceXidsNeeded);
128   start += strlen(start);
129
130   for (i = 0; i < numSlices; i++) {
131     SliceInfo *si = &slices[i];
132     sprintf(start, "Slice %d: %s xid %d, %d conns, inUse %d\n", 
133             i, si->si_sliceName, si->si_xid, si->si_numConns,
134             si->si_inUse);
135     start += strlen(start);
136   }
137
138   for (i = 0; i < numServices; i++) {
139     ServiceSig *ss = &serviceSig[i];
140     sprintf(start, "Service %d: %s %s port %d, slice# %d\n", i, ss->ss_host,
141             ss->ss_slice, (int) ss->ss_port, ss->ss_slicePos);
142     start += strlen(start);
143   }
144
145   len = start - buf;
146   write(fd, buf, len);
147 }
148 /*-----------------------------------------------------------------*/
149 static void
150 GetSliceXids(void)
151 {
152   /* walks through /etc/passwd, and gets the uid for every slice we
153      have */
154   FILE *f;
155   char *line;
156   int i;
157
158   if (!anySliceXidsNeeded)
159     return;
160
161   for (i = 0; i < numSlices; i++) {
162     SliceInfo *si = &slices[i];
163     si->si_inUse = 0;
164   }
165   for (i = 0; i < numServices; i++) {
166     SliceInfo *si = ServiceToSlice(i);
167     if (si != NULL)
168       si->si_inUse++;
169   }  
170
171   if ((f = fopen("/etc/passwd", "r")) == NULL)
172     return;
173
174   while ((line = GetNextLine(f)) != NULL) {
175     char *temp;
176     int xid;
177
178     if ((temp = strchr(line, ':')) == NULL)
179       goto next_line;                   /* weird line */
180     *temp = '\0';               /* terminate slice name */
181     temp++;
182     if ((temp = strchr(temp+1, ':')) == NULL)
183       goto next_line;   /* weird line */
184     if ((xid = atoi(temp+1)) < 1)
185       goto next_line;   /* weird xid */
186     
187     /* we've got a slice name and xid, let's try to match */
188     for (i = 0; i < numSlices; i++) {
189       if (slices[i].si_xid == 0 &&
190           strcasecmp(slices[i].si_sliceName, line) == 0) {
191         slices[i].si_xid = xid;
192         break;
193       }
194     }
195   next_line:
196     if (line)
197       xfree(line);
198   }
199
200   /* assume service 0 is the root service, and don't check it since
201      it'll have xid zero */
202   anySliceXidsNeeded = FALSE;
203   for (i = 1; i < numSlices; i++) {
204     if (slices[i].si_xid == 0 && slices[i].si_inUse > 0) {
205       anySliceXidsNeeded = TRUE;
206       break;
207     }
208   }
209
210   fclose(f);
211 }
212 /*-----------------------------------------------------------------*/
213 static void
214 SliceConnsInc(int whichService)
215 {
216   SliceInfo *si = ServiceToSlice(whichService);
217
218   if (si == NULL)
219     return;
220   numTotalSliceConns++;
221   si->si_numConns++;
222   if (si->si_numConns == 1)
223     numActiveSlices++;
224 }
225 /*-----------------------------------------------------------------*/
226 static void
227 SliceConnsDec(int whichService)
228 {
229   SliceInfo *si = ServiceToSlice(whichService);
230
231   if (si == NULL)
232     return;
233   numTotalSliceConns--;
234   si->si_numConns--;
235   if (si->si_numConns == 0)
236     numActiveSlices--;
237 }
238 /*-----------------------------------------------------------------*/
239 static int
240 WhichSlicePos(char *slice)
241 {
242   /* adds the new slice if necessary, returns the index into slice
243      array. Never change the ordering of existing slices */
244   int i;
245   static int numSlicesAlloc;
246
247   for (i = 0; i < numSlices; i++) {
248     if (strcasecmp(slice, slices[i].si_sliceName) == 0)
249       return(i);
250   }
251
252   if (numSlices >= numSlicesAlloc) {
253     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
254     slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
255   }
256
257   memset(&slices[numSlices], 0, sizeof(SliceInfo));
258   slices[numSlices].si_sliceName = xstrdup(slice);
259   numSlices++;
260   return(numSlices-1);
261 }
262 /*-----------------------------------------------------------------*/
263 static void
264 ReadConfFile(void)
265 {
266   int numAlloc = 0;
267   int num = 0;
268   ServiceSig *servs = NULL;
269   FILE *f;
270   char *line = NULL;
271   struct stat statBuf;
272   int i;
273
274   if (stat(CONF_FILE, &statBuf) != 0) {
275     fprintf(stderr, "failed stat on codemux.conf\n");
276     if (numServices)
277       return;
278     exit(-1);
279   }
280   if (statBuf.st_mtime == confFileReadTime)
281     return;
282
283   if ((f = fopen(CONF_FILE, "r")) == NULL) {
284     fprintf(stderr, "failed reading codemux.conf\n");
285     if (numServices)
286       return;
287     exit(-1);
288   }
289
290   /* conf file entries look like
291      coblitz.codeen.org princeton_coblitz 3125
292   */
293
294   while (1) {
295     ServiceSig serv;
296     int port;
297     if (line != NULL)
298       xfree(line);
299     
300     if ((line = GetNextLine(f)) == NULL)
301       break;
302
303     memset(&serv, 0, sizeof(serv));
304     if (WordCount(line) < 3) {
305       fprintf(stderr, "bad line: %s\n", line);
306       continue;
307     }
308     serv.ss_port = port = atoi(GetField(line, 2));
309     if (port < 1 || port > 65535 || port == DEMUX_PORT) {
310       fprintf(stderr, "bad port: %s\n", line);
311       continue;
312     }
313
314     serv.ss_host = GetWord(line, 0);
315     serv.ss_slice = GetWord(line, 1);
316     serv.ss_ip = GetWord(line, 3);
317
318     if (num == 0) {
319       /* the first row must be an entry for apache */
320       if (strcmp(serv.ss_host, "*") != 0 ||
321           strcmp(serv.ss_slice, "root") != 0) {
322         fprintf(stderr, "first row has to be for webserver\n");
323         exit(-1);
324       }
325       /* see if there's PLC netflow's domain name */
326       if (domainNamePLCNetflow != NULL) {
327         xfree(domainNamePLCNetflow);
328         domainNamePLCNetflow = NULL;
329       }
330       domainNamePLCNetflow = GetWord(line, 3);
331     }
332     if (num >= numAlloc) {
333       numAlloc = MAX(numAlloc * 2, 8);
334       servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
335     }
336     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
337     if (slices[serv.ss_slicePos].si_inUse == 0 &&
338         slices[serv.ss_slicePos].si_xid < 1)
339       anySliceXidsNeeded = TRUE; /* if new/inactive, we need xid */
340     servs[num] = serv;
341     num++;
342   }
343
344   fclose(f);
345
346 #if 0
347   /* Faiyaz asked me to allow a single-entry codemux conf */
348   if (num == 1) {
349     if (numServices == 0) {
350       fprintf(stderr, "nothing found in codemux.conf\n");
351       exit(-1);
352     }
353     return;
354   }
355 #endif
356   if (num < 1) {
357     fprintf(stderr, "no entry found in codemux.conf\n");
358     exit(-1);
359   }
360
361   for (i = 0; i < numServices; i++) {
362     xfree(serviceSig[i].ss_host);
363     xfree(serviceSig[i].ss_ip);
364     xfree(serviceSig[i].ss_slice);
365   }
366   xfree(serviceSig);
367   serviceSig = servs;
368   numServices = num;
369   confFileReadTime = statBuf.st_mtime;
370 }
371 /*-----------------------------------------------------------------*/
372 static char *err400BadRequest =
373 "HTTP/1.0 400 Bad Request\r\n"
374 "Content-Type: text/html\r\n"
375 "\r\n"
376 "You are trying to access a PlanetLab node, and your\n"
377 "request header exceeded the allowable size. Please\n"
378 "try again if you believe this error is temporary.\n";
379 /*-----------------------------------------------------------------*/
380 static char *err503Unavailable =
381 "HTTP/1.0 503 Service Unavailable\r\n"
382 "Content-Type: text/html\r\n"
383 "\r\n"
384 "You are trying to access a PlanetLab node, but the service\n"
385 "seems to be unavailable at the moment. Please try again.\n";
386 /*-----------------------------------------------------------------*/
387 static char *err503TooBusy =
388 "HTTP/1.0 503 Service Unavailable\r\n"
389 "Content-Type: text/html\r\n"
390 "\r\n"
391 "You are trying to access a PlanetLab node, but the service\n"
392 "seems to be overloaded at the moment. Please try again.\n";
393 /*-----------------------------------------------------------------*/
394 static void
395 SetFd(int fd, OurFDSet *set)
396 {
397   if (highestSetFd < fd)
398     highestSetFd = fd;
399   FD_SET(fd, set);
400 }
401 /*-----------------------------------------------------------------*/
402 static void
403 ClearFd(int fd, OurFDSet *set)
404 {
405   FD_CLR(fd, set);
406 }
407 /*-----------------------------------------------------------------*/
408 static int
409 RemoveHeader(char *lower, char *real, int totalSize, char *header)
410 {
411   /* returns number of characters removed */
412   char h2[256];
413   int start, end, len;
414   char *temp, *conn;
415
416   sprintf(h2, "\n%s", header);
417
418   if ((conn = strstr(lower, h2)) == NULL)
419     return(0);
420
421   conn++;
422   /* determine how many characters to remove */
423   if ((temp = strchr(conn, '\n')) != NULL)
424     len = (temp - conn) + 1;
425   else
426     len = strlen(conn) + 1;
427   start = conn - lower;
428   end = start + len;
429   memmove(&real[start], &real[end], totalSize - end);
430   memmove(&lower[start], &lower[end], totalSize - end);
431
432   return(len);
433 }
434 /*-----------------------------------------------------------------*/
435 static int
436 InsertHeader(char *buf, int totalSize, char *header)
437 {
438   /* returns number of bytes inserted */
439   
440   char h2[256];
441   char *temp;
442   int len;
443   
444   sprintf(h2, "%s\r\n", header);
445   len = strlen(h2);
446
447   /* if we don't encounter a \n, it means that we have only a single
448      line, and we'd converted the \n to a \0 */
449   if ((temp = strchr(buf, '\n')) == NULL)
450     temp = strchr(buf, '\0');
451   temp++;
452   
453   memmove(temp + len, temp, totalSize - (temp - buf));
454   memcpy(temp, h2, len);
455   
456   return(len);
457 }
458 /*-----------------------------------------------------------------*/
459 static int
460 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
461 {
462   char *end;
463   char lowerBuf[FB_ALLOCSIZE];
464   char *hostVal;
465   char *buf = fb->fb_buf;
466   char orig[256];
467 #if 0
468   char *url;
469   int i;
470   int len;
471 #endif
472
473   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
474     return(FAILURE);
475
476   /* insert client info after first line */
477   sprintf(orig, "X-CoDemux-Client: %s", inet_ntoa(addr));
478   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
479     
480   /* get just the header, so we can work on it */
481   StrcpyLower(lowerBuf, buf);
482   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
483     end = strstr(lowerBuf, "\n\n");
484   *end = '\0';
485   
486   /* remove any existing connection, keep-alive headers, add ours */
487   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "keep-alive:");
488   fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "connection:");
489   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, "Connection: close");
490   InsertHeader(lowerBuf, fb->fb_used + 1, "connection: close");
491
492   /* isolate host, see if it matches */
493   if ((hostVal = strstr(lowerBuf, "\nhost:")) != NULL) {
494     int i;
495     hostVal += strlen("\nhost:");
496     if ((end = strchr(hostVal, '\n')) != NULL)
497       *end = '\0';
498     if ((end = strchr(hostVal, ':')) != NULL)
499       *end = '\0';
500     while (isspace(*hostVal))
501       hostVal++;
502     if (strlen(hostVal) > 0) {
503       hostVal = GetWord(hostVal, 0);
504       for (i = 1; i < numServices; i++) {
505         if (serviceSig[i].ss_host != NULL &&
506             DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
507           *whichService = i;
508           free(hostVal);
509           return(SUCCESS);
510         }
511       }
512       free(hostVal);
513     }
514   }
515
516 #if 0
517   /* see if URL prefix matches */
518   if ((end = strchr(lowerBuf, '\n')) != NULL)
519     *end = 0;
520   if ((url = GetField(lowerBuf, 1)) == NULL ||
521       url[0] != '/') {
522     /* bad request - let apache handle it ? */
523     *whichService = 0;
524     return(SUCCESS);
525   }
526   url++;                        /* skip the leading slash */
527   for (i = 1; i < numServices; i++) {
528     if (serviceSig[i].ss_prefix != NULL &&
529         (len = strlen(serviceSig[i].ss_prefix)) > 0 &&
530         strncmp(url, serviceSig[i].ss_prefix, len) == 0 &&
531         (url[len] == ' ' || url[len] == '/')) {
532       int startPos = url - lowerBuf;
533       int stripLen = len + ((url[len] == '/') ? 1 : 0);
534       /* strip out prefix */
535       fb->fb_used -= stripLen;
536       memmove(&buf[startPos], &buf[startPos+stripLen], 
537               fb->fb_used + 1 - startPos);
538       /* printf("%s", buf); */
539       *whichService = i;
540       return(SUCCESS);
541     }
542   }
543 #endif
544
545   /* default to first service */
546   *whichService = 0;
547   return(SUCCESS);
548 }
549 /*-----------------------------------------------------------------*/
550 static int
551 StartConnect(int origFD, int whichService)
552 {
553   int sock;
554   struct sockaddr_in dest;
555   SockInfo *si;
556
557   /* create socket */
558   if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
559     return(FAILURE);
560   }
561   
562   /* make socket non-blocking */
563   if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
564     close(sock);
565     return(FAILURE);
566   }
567   
568   /* set addr structure */
569   memset(&dest, 0, sizeof(dest));
570   dest.sin_family = AF_INET;
571   dest.sin_port = htons(serviceSig[whichService].ss_port);
572   if (serviceSig[whichService].ss_ip != NULL) {
573         dest.sin_addr.s_addr = inet_addr(serviceSig[whichService].ss_ip);
574   } else {
575         dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
576   }
577
578   /* start connection process - we should be told that it's in
579      progress */
580   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
581       errno != EINPROGRESS) {
582     close(sock);
583     return(FAILURE);
584   }
585
586   SetFd(sock, &masterWriteSet); /* determine when connect finishes */
587   sockInfo[origFD].si_peerFd = sock;
588   si = &sockInfo[sock];
589   memset(si, 0, sizeof(SockInfo));
590   si->si_peerFd = origFD;
591   si->si_blocked = TRUE;        /* still connecting */
592   si->si_whichService = whichService;
593   si->si_writeBuf = sockInfo[origFD].si_readBuf;
594   sockInfo[origFD].si_readBuf->fb_refs++;
595   if (whichService >= 0)
596     SliceConnsInc(whichService);
597
598   return(SUCCESS);
599 }
600 /*-----------------------------------------------------------------*/
601 static int
602 WriteAvailData(int fd)
603 {
604   SockInfo *si = &sockInfo[fd];
605   FlowBuf *fb = si->si_writeBuf;
606   int res;
607
608   /* printf("trying to write fd %d\n", fd); */
609   if (fb->fb_used < 1 || si->si_blocked)
610     return(SUCCESS);
611
612   /* printf("trying to write %d bytes\n", fb->fb_used); */
613   /* write(STDOUT_FILENO, fb->fb_buf, fb->fb_used); */
614   if ((res = write(fd, fb->fb_buf, fb->fb_used)) > 0) {
615     fb->fb_used -= res;
616     if (fb->fb_used > 0) {
617       /* couldn't write all - assume blocked */
618       memmove(fb->fb_buf, &fb->fb_buf[res], fb->fb_used);
619       si->si_blocked = TRUE;
620       SetFd(fd, &masterWriteSet);
621     }
622     /* printf("wrote %d\n", res); */
623     return(SUCCESS);
624   }
625
626   /* we might have been full but didn't realize it */
627   if (res == -1 && errno == EAGAIN) {
628     si->si_blocked = TRUE;
629     SetFd(fd, &masterWriteSet);
630     return(SUCCESS);
631   }
632
633   /* otherwise, assume the worst */
634   return(FAILURE);
635 }
636 /*-----------------------------------------------------------------*/
637 static OurFDSet socksToCloseVec;
638 static int numSocksToClose;
639 static int whichSocksToClose[TARG_SETSIZE];
640 /*-----------------------------------------------------------------*/
641 static void
642 CloseSock(int fd)
643 {
644   if (FD_ISSET(fd, &socksToCloseVec))
645     return;
646   SetFd(fd, &socksToCloseVec);
647   whichSocksToClose[numSocksToClose] = fd;
648   numSocksToClose++;
649 }
650 /*-----------------------------------------------------------------*/
651 static void
652 DecBuf(FlowBuf *buf)
653 {
654   if (buf == NULL)
655     return;
656   buf->fb_refs--;
657   if (buf->fb_refs == 0) {
658     free(buf->fb_buf);
659     free(buf);
660   }
661 }
662 /*-----------------------------------------------------------------*/
663 static void
664 ReallyCloseSocks(void)
665 {
666   int i;
667
668   memset(&socksToCloseVec, 0, sizeof(socksToCloseVec));
669
670   for (i = 0; i < numSocksToClose; i++) {
671     int fd = whichSocksToClose[i];
672     close(fd);
673     DecBuf(sockInfo[fd].si_readBuf);
674     DecBuf(sockInfo[fd].si_writeBuf);
675     ClearFd(fd, &masterReadSet);
676     ClearFd(fd, &masterWriteSet);
677     if (sockInfo[fd].si_needsHeaderSince) {
678       sockInfo[fd].si_needsHeaderSince = 0;
679       numNeedingHeaders--;
680     }
681     if (sockInfo[fd].si_whichService >= 0) {
682       SliceConnsDec(sockInfo[fd].si_whichService);
683       sockInfo[fd].si_whichService = -1;
684     }
685     /* KyoungSoo*/
686     if (sockInfo[fd].si_peerFd >= 0) {
687       sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
688     }
689   }
690   numSocksToClose = 0;
691 }
692 /*-----------------------------------------------------------------*/
693 static void
694 SocketReadyToRead(int fd)
695 {
696   SockInfo *si = &sockInfo[fd];
697   int spaceLeft;
698   FlowBuf *fb;
699   int res;
700
701   /* if peer is closed, close ourselves */
702   if (si->si_peerFd < 0 && (!si->si_needsHeaderSince)) {
703     CloseSock(fd);
704     return;
705   }
706
707   if ((fb = si->si_readBuf) == NULL) {
708     fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
709     fb->fb_refs = 1;
710     if (si->si_peerFd >= 0) {
711       sockInfo[si->si_peerFd].si_writeBuf = fb;
712       fb->fb_refs = 2;
713     }
714   }
715
716   if (fb->fb_buf == NULL)
717     fb->fb_buf = xmalloc(FB_ALLOCSIZE);
718
719   /* determine read buffer size - if 0, then block reads and return */
720   if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
721     if (si->si_needsHeaderSince) {
722       write(fd, err400BadRequest, strlen(err400BadRequest));
723       CloseSock(fd);
724       return;
725     }
726     else {
727       ClearFd(fd, &masterReadSet);
728       return;
729     }
730   } 
731   
732   /* read as much as allowed, and is available */
733   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
734     CloseSock(fd);
735     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
736       CloseSock(si->si_peerFd);
737       si->si_peerFd = -1;
738     }
739     return;
740   }
741   if (res == -1) {
742     if (errno == EAGAIN)
743       return;
744     TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
745     CloseSock(fd);
746     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
747       CloseSock(si->si_peerFd);
748       si->si_peerFd = -1;
749     }
750     return;
751   }
752   fb->fb_used += res;
753   fb->fb_buf[fb->fb_used] = 0;  /* terminate it for convenience */
754   //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
755
756   /* if we need header, check if we've gotten it. if so, do
757      modifications and continue. if not, check if we've read the
758      maximum, and if so, fail */
759   if (si->si_needsHeaderSince) {
760     int whichService;
761     SliceInfo *slice;
762
763 #define STATUS_REQ "GET /codemux/status.txt"
764     if (strncasecmp(fb->fb_buf, STATUS_REQ, sizeof(STATUS_REQ)-1) == 0) {
765       DumpStatus(fd);
766       CloseSock(fd);
767       return;
768     }
769
770     //    printf("trying to find service\n");
771     if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
772       return;
773     //    printf("found service %d\n", whichService);
774     slice = ServiceToSlice(whichService);
775
776     /* if it needs to be redirected to PLC, let it be handled here */
777     if (whichService == 0 && domainNamePLCNetflow != NULL &&
778         strcmp(slice->si_sliceName, "root") == 0) {
779       char msg[1024];
780       int len;
781       static const char* resp302 = 
782         "HTTP/1.0 302 Found\r\n"
783         "Location: http://%s\r\n"
784         "Cache-Control: no-cache, no-store\r\n"
785         "Content-type: text/html\r\n"
786         "Connection: close\r\n"
787         "\r\n"
788         "Your request is being redirected to PLC Netflow http://%s\n";
789       len = snprintf(msg, sizeof(msg), resp302, 
790                      domainNamePLCNetflow, domainNamePLCNetflow);
791       write(fd, msg, len);
792       CloseSock(fd);
793       return;
794     }
795     /* no service can have more than some absolute max number of
796        connections. Also, when we're too busy, start enforcing
797        fairness across the servers */
798     if (slice->si_numConns > SERVICE_MAX ||
799         (numTotalSliceConns > FAIRNESS_CUTOFF && 
800          slice->si_numConns > MAX_CONNS/numActiveSlices)) {
801       write(fd, err503TooBusy, strlen(err503TooBusy));
802       TRACE("CloseSock(): fd=%d too busy\n", fd);
803       CloseSock(fd);
804       return;
805     }
806
807     if (slice->si_xid > 0) {
808       static int first = 1;
809       setsockopt(fd, SOL_SOCKET, SO_SETXID, 
810                  &slice->si_xid, sizeof(slice->si_xid));
811       if (first) {
812         /* just to log it for once */
813         fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
814                 slice->si_xid, slice->si_sliceName);
815         first = 0;
816       }
817     }
818
819     si->si_needsHeaderSince = 0;
820     numNeedingHeaders--;
821     if (StartConnect(fd, whichService) != SUCCESS) {
822       write(fd, err503Unavailable, strlen(err503Unavailable));
823       TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
824       CloseSock(fd);
825       return;
826     }
827     return;
828   }
829
830   /* write anything possible */
831   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
832     /* assume the worst and close */
833     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
834           fd, errno, strerror(errno));
835     CloseSock(fd);
836     if (si->si_peerFd >=0) {
837       CloseSock(si->si_peerFd);
838       si->si_peerFd = -1;
839     }
840   }
841 }
842 /*-----------------------------------------------------------------*/
843 static void
844 SocketReadyToWrite(int fd)
845 {
846   SockInfo *si = &sockInfo[fd];
847
848   /* unblock it and read what it has */
849   si->si_blocked = FALSE;
850   ClearFd(fd, &masterWriteSet);
851   SetFd(fd, &masterReadSet);
852   
853   /* enable reading on peer just in case it was off */
854   if (si->si_peerFd >= 0)
855     SetFd(si->si_peerFd, &masterReadSet);
856     
857   /* if we have data, write it */
858   if (WriteAvailData(fd) != SUCCESS) {
859    /* assume the worst and close */
860     TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
861           fd, errno, strerror(errno));
862     CloseSock(fd);
863     if (si->si_peerFd >= 0) {
864       CloseSock(si->si_peerFd);
865       si->si_peerFd = -1;
866     }
867     return;
868   }
869
870   /* if peer is closed and we're done writing, we should close */
871   if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
872     CloseSock(fd);
873   }
874 }
875 /*-----------------------------------------------------------------*/
876 static void
877 CloseReqlessConns(void)
878 {
879   static int lastSweep;
880   int maxAge;
881   int i;
882
883   if (lastSweep == now)
884     return;
885   lastSweep = now;
886
887   if (numTotalSliceConns + numNeedingHeaders > MAX_CONNS ||
888       numNeedingHeaders > TARG_SETSIZE/20) {
889     /* second condition is probably an attack - close aggressively */
890     maxAge = 5;
891   }
892   else if (numTotalSliceConns + numNeedingHeaders > FAIRNESS_CUTOFF ||
893            numNeedingHeaders > TARG_SETSIZE/40) {
894     /* sweep a little aggressively */
895     maxAge = 10;
896   }
897   else if (numNeedingHeaders > TARG_SETSIZE/80) {
898     /* just sweep to close strays */
899     maxAge = 30;
900   }
901   else {
902     /* too little gained - not worth sweeping */
903     return;
904   }
905
906   /* if it's too old, close it */
907   for (i = 0; i < highestSetFd+1; i++) {
908     if (sockInfo[i].si_needsHeaderSince &&
909         (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
910       CloseSock(i);
911   }
912 }
913 /*-----------------------------------------------------------------*/
914 static void
915 MainLoop(int lisSock)
916 {
917   int i;
918   OurFDSet tempReadSet, tempWriteSet;
919   int res;
920   int lastConfCheck = 0;
921
922   signal(SIGPIPE, SIG_IGN);
923
924   while (1) {
925     int newSock;
926     int ceiling;
927     struct timeval timeout;
928
929     now = time(NULL);
930
931     if (now - lastConfCheck > 300) {
932       ReadConfFile();
933       GetSliceXids();           /* always call - in case new slices created */
934       lastConfCheck = now;
935     }
936
937     /* see if there's any activity */
938     tempReadSet = masterReadSet;
939     tempWriteSet = masterWriteSet;
940
941     /* trim it down if needed */
942     while (highestSetFd > 1 &&
943            (!FD_ISSET(highestSetFd, &tempReadSet)) &&
944            (!FD_ISSET(highestSetFd, &tempWriteSet)))
945       highestSetFd--;
946     timeout.tv_sec = 1;
947     timeout.tv_usec = 0;
948     res = select(highestSetFd+1, (fd_set *) &tempReadSet, 
949                  (fd_set *) &tempWriteSet, NULL, &timeout);
950     if (res < 0 && errno != EINTR) {
951       perror("select");
952       exit(-1);
953     }
954
955     now = time(NULL);
956
957     /* clear the bit for listen socket to avoid confusion */
958     ClearFd(lisSock, &tempReadSet);
959     
960     ceiling = highestSetFd+1;   /* copy it, since it changes during loop */
961     /* pass data back and forth as needed */
962     for (i = 0; i < ceiling; i++) {
963       if (FD_ISSET(i, &tempWriteSet))
964         SocketReadyToWrite(i);
965     }
966     for (i = 0; i < ceiling; i++) {
967       if (FD_ISSET(i, &tempReadSet))
968         SocketReadyToRead(i);
969     }
970
971     /* see if we need to close conns w/o requests */
972     CloseReqlessConns();
973     
974     /* do all closes */
975     ReallyCloseSocks();
976
977     /* try accepting new connections */
978     do {
979       struct sockaddr_in addr;
980       socklen_t lenAddr = sizeof(addr);
981       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
982                             &lenAddr)) >= 0) {
983         /* make socket non-blocking */
984         if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
985           close(newSock);
986           continue;
987         }
988         memset(&sockInfo[newSock], 0, sizeof(SockInfo));
989         sockInfo[newSock].si_needsHeaderSince = now;
990         numNeedingHeaders++;
991         sockInfo[newSock].si_peerFd = -1;
992         sockInfo[newSock].si_cliAddr = addr.sin_addr;
993         sockInfo[newSock].si_whichService = -1;
994         SetFd(newSock, &masterReadSet);
995       }
996     } while (newSock >= 0);
997   }
998 }
999 /*-----------------------------------------------------------------*/
1000 static int 
1001 InitDaemon(void)
1002 {
1003   pid_t pid;
1004   FILE *pidfile;
1005   
1006   pidfile = fopen(PIDFILE, "w");
1007   if (pidfile == NULL) {
1008     fprintf(stderr, "%s creation failed\n", PIDFILE);
1009     return(-1);
1010   }
1011
1012   if ((pid = fork()) < 0) {
1013     fclose(pidfile);
1014     return(-1);
1015   }
1016   else if (pid != 0) {
1017     /* i'm the parent, writing down the child pid  */
1018     fprintf(pidfile, "%u\n", pid);
1019     fclose(pidfile);
1020     exit(0);
1021   }
1022
1023   /* close the pid file */
1024   fclose(pidfile);
1025
1026   /* routines for any daemon process
1027      1. create a new session 
1028      2. change directory to the root
1029      3. change the file creation permission 
1030   */
1031   setsid();
1032   chdir("/");
1033   umask(0);
1034
1035   return(0);
1036 }
1037 /*-----------------------------------------------------------------*/
1038 static int
1039 OpenLogFile(void)
1040 {
1041   static const char* logfile = "/var/log/codemux.log";
1042   int logfd;
1043
1044   logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
1045   if (logfd < 0) {
1046     fprintf(stderr, "cannot open the logfile err=%s\n",
1047             strerror(errno));
1048     exit(-1);
1049   }
1050
1051   /* duplicate logfile to stderr */
1052   if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
1053     fprintf(stderr, "cannot open the logfile err=%s\n",
1054             strerror(errno));
1055     exit(-1);
1056   }
1057   
1058   /* set the close-on-exec flag */
1059   if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
1060     fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
1061             strerror(errno));
1062     exit(-1);
1063   }
1064
1065   return logfd;
1066 }
1067 /*-----------------------------------------------------------------*/
1068 int
1069 main(int argc, char *argv[])
1070 {
1071   int lisSock;
1072   int logFd;
1073   int doDaemon = 1;
1074   int opt;
1075   struct in_addr lisAddress = { .s_addr = htonl(INADDR_ANY) };
1076
1077   while ((opt = getopt(argc, argv, "dl:")) != -1) {
1078     switch (opt) {
1079       case 'd':
1080         doDaemon = 0;
1081         break;
1082       case 'l':
1083         if (inet_pton(AF_INET, optarg, &lisAddress) <= 0) {
1084           fprintf(stderr, "`%s' is not a valid address\n", optarg);
1085           exit(-1);
1086         }
1087         break;
1088       default:
1089         fprintf(stderr, "Usage: %s [-d] [-l <listening address>]\n", argv[0]);
1090         exit(-1);
1091     }
1092   }
1093
1094   /* do the daemon stuff */
1095   if (doDaemon) {
1096     if (InitDaemon() < 0) {
1097       fprintf(stderr, "codemux daemon_init() failed\n");
1098       exit(-1);
1099     }
1100   }
1101
1102   /* create the accept socket */
1103   if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE,
1104                                            &lisAddress)) < 0) {
1105     fprintf(stderr, "failed creating accept socket\n");
1106     exit(-1);
1107   }
1108   SetFd(lisSock, &masterReadSet);
1109
1110   /* open the log file */
1111   logFd = OpenLogFile();
1112
1113
1114   /* write down the version */
1115   fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
1116
1117   while (1) {
1118     numForks++;
1119     if (fork()) {
1120       /* this is the parent - just wait */
1121       while (wait3(NULL, 0, NULL) < 1)
1122         ;                       /* just keep waiting for a real pid */
1123     }
1124     else {
1125       /* child process */
1126       MainLoop(lisSock);
1127       exit(-1);
1128     }
1129   }
1130 }
1131 /*-----------------------------------------------------------------*/