2 #include <sys/socket.h>
5 #include <netinet/in.h>
17 #define CONF_FILE "/etc/codemux/codemux.conf"
19 #define REAL_WEBSERVER_CONFLINE "* root 1080"
20 #define TARG_SETSIZE 4096
22 /* set aside some small number of fds for us, allow the rest for
24 #define MAX_CONNS ((TARG_SETSIZE-20)/2)
26 /* no single service can take more than half the connections */
27 #define SERVICE_MAX (MAX_CONNS/2)
29 /* how many total connections before we get concerned about fairness
31 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
34 typedef struct FlowBuf {
35 int fb_refs; /* num refs */
36 char *fb_buf; /* actual buffer */
37 int fb_used; /* bytes used in buffer */
39 #define FB_SIZE 3800 /* max usable size */
40 #define FB_ALLOCSIZE 4000 /* extra to include IP address */
42 typedef struct SockInfo {
43 int si_peerFd; /* fd of peer */
44 struct in_addr si_cliAddr; /* address of client */
45 int si_blocked; /* are we blocked? */
46 int si_needsHeaderSince; /* since when are we waiting for a header */
47 int si_whichService; /* index of service */
48 FlowBuf *si_readBuf; /* read data into this buffer */
49 FlowBuf *si_writeBuf; /* drain this buffer for writing */
52 static SockInfo sockInfo[TARG_SETSIZE]; /* fd number of peer socket */
54 typedef struct ServiceSig {
55 char *ss_host; /* suffix in host */
58 int ss_slicePos; /* position in slices array */
61 static ServiceSig *serviceSig;
62 static int numServices;
63 static int confFileReadTime;
66 typedef struct SliceInfo {
68 int si_inUse; /* do any services refer to this? */
73 static SliceInfo *slices;
75 static int numActiveSlices;
76 static int numTotalSliceConns;
77 static int anySliceXidsNeeded;
79 typedef struct OurFDSet {
80 long __fds_bits[TARG_SETSIZE/32];
82 static OurFDSet masterReadSet, masterWriteSet;
83 static int highestSetFd;
84 static int numNeedingHeaders; /* how many conns waiting on headers? */
91 #define SO_SETXID SO_PEERCRED
93 /*-----------------------------------------------------------------*/
95 ServiceToSlice(int whichService)
99 return(&slices[serviceSig[whichService].ss_slicePos]);
101 /*-----------------------------------------------------------------*/
111 "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
112 "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
113 numForks, numActiveSlices, numTotalSliceConns,
114 numNeedingHeaders, anySliceXidsNeeded);
115 start += strlen(start);
117 for (i = 0; i < numSlices; i++) {
118 SliceInfo *si = &slices[i];
119 sprintf(start, "Slice %d: %s xid %d, %d conns, inUse %d\n",
120 i, si->si_sliceName, si->si_xid, si->si_numConns,
122 start += strlen(start);
125 for (i = 0; i < numServices; i++) {
126 ServiceSig *ss = &serviceSig[i];
127 sprintf(start, "Service %d: %s %s port %d, slice# %d\n", i, ss->ss_host,
128 ss->ss_slice, (int) ss->ss_port, ss->ss_slicePos);
129 start += strlen(start);
135 /*-----------------------------------------------------------------*/
139 /* walks through /etc/passwd, and gets the uid for every slice we
145 if (!anySliceXidsNeeded)
148 for (i = 0; i < numSlices; i++) {
149 SliceInfo *si = &slices[i];
152 for (i = 0; i < numServices; i++) {
153 SliceInfo *si = ServiceToSlice(i);
158 if ((f = fopen("/etc/passwd", "r")) == NULL)
161 while ((line = GetNextLine(f)) != NULL) {
165 if ((temp = strchr(line, ':')) == NULL)
166 continue; /* weird line */
167 *temp = '\0'; /* terminate slice name */
169 if ((temp = strchr(temp+1, ':')) == NULL)
170 continue; /* weird line */
171 if ((xid = atoi(temp+1)) < 1)
172 continue; /* weird xid */
174 /* we've got a slice name and xid, let's try to match */
175 for (i = 0; i < numSlices; i++) {
176 if (slices[i].si_xid == 0 &&
177 strcasecmp(slices[i].si_sliceName, line) == 0) {
178 slices[i].si_xid = xid;
184 /* assume service 0 is the root service, and don't check it since
185 it'll have xid zero */
186 anySliceXidsNeeded = FALSE;
187 for (i = 1; i < numSlices; i++) {
188 if (slices[i].si_xid == 0 && slices[i].si_inUse > 0) {
189 anySliceXidsNeeded = TRUE;
196 /*-----------------------------------------------------------------*/
198 SliceConnsInc(int whichService)
200 SliceInfo *si = ServiceToSlice(whichService);
204 numTotalSliceConns++;
206 if (si->si_numConns == 1)
209 /*-----------------------------------------------------------------*/
211 SliceConnsDec(int whichService)
213 SliceInfo *si = ServiceToSlice(whichService);
217 numTotalSliceConns--;
219 if (si->si_numConns == 0)
222 /*-----------------------------------------------------------------*/
224 WhichSlicePos(char *slice)
226 /* adds the new slice if necessary, returns the index into slice
227 array. Never change the ordering of existing slices */
229 static int numSlicesAlloc;
231 for (i = 0; i < numSlices; i++) {
232 if (strcasecmp(slice, slices[i].si_sliceName) == 0)
236 if (numSlices >= numSlicesAlloc) {
237 numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
238 slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo));
241 memset(&slices[numSlices], 0, sizeof(SliceInfo));
242 slices[numSlices].si_sliceName = strdup(slice);
246 /*-----------------------------------------------------------------*/
252 ServiceSig *servs = NULL;
258 if (stat(CONF_FILE, &statBuf) != 0) {
259 fprintf(stderr, "failed stat on codemux.conf\n");
264 if (statBuf.st_mtime == confFileReadTime)
267 if ((f = fopen(CONF_FILE, "r")) == NULL) {
268 fprintf(stderr, "failed reading codemux.conf\n");
274 /* conf file entries look like
275 coblitz.codeen.org princeton_coblitz 3125
284 /* on the first pass, put in a fake entry for apache */
286 line = strdup(REAL_WEBSERVER_CONFLINE);
288 if ((line = GetNextLine(f)) == NULL)
292 memset(&serv, 0, sizeof(serv));
293 if (WordCount(line) != 3) {
294 fprintf(stderr, "bad line: %s\n", line);
297 serv.ss_port = port = atoi(GetField(line, 2));
298 if (port < 1 || port > 65535 || port == DEMUX_PORT) {
299 fprintf(stderr, "bad port: %s\n", line);
303 serv.ss_host = GetWord(line, 0);
304 serv.ss_slice = GetWord(line, 1);
305 if (num >= numAlloc) {
306 numAlloc = MAX(numAlloc * 2, 8);
307 servs = realloc(servs, numAlloc * sizeof(ServiceSig));
309 serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
310 if (slices[serv.ss_slicePos].si_inUse == 0 &&
311 slices[serv.ss_slicePos].si_xid < 1)
312 anySliceXidsNeeded = TRUE; /* if new/inactive, we need xid */
320 if (numServices == 0) {
321 fprintf(stderr, "nothing found in codemux.conf\n");
327 for (i = 0; i < numServices; i++) {
328 free(serviceSig[i].ss_host);
329 free(serviceSig[i].ss_slice);
334 confFileReadTime = statBuf.st_mtime;
336 /*-----------------------------------------------------------------*/
337 static char *err400BadRequest =
338 "HTTP/1.0 400 Bad Request\r\n"
339 "Content-Type: text/html\r\n"
341 "You are trying to access a PlanetLab node, and your\n"
342 "request header exceeded the allowable size. Please\n"
343 "try again if you believe this error is temporary.\n";
344 /*-----------------------------------------------------------------*/
345 static char *err503Unavailable =
346 "HTTP/1.0 503 Service Unavailable\r\n"
347 "Content-Type: text/html\r\n"
349 "You are trying to access a PlanetLab node, but the service\n"
350 "seems to be unavailable at the moment. Please try again.\n";
351 /*-----------------------------------------------------------------*/
352 static char *err503TooBusy =
353 "HTTP/1.0 503 Service Unavailable\r\n"
354 "Content-Type: text/html\r\n"
356 "You are trying to access a PlanetLab node, but the service\n"
357 "seems to be overloaded at the moment. Please try again.\n";
358 /*-----------------------------------------------------------------*/
360 SetFd(int fd, OurFDSet *set)
362 if (highestSetFd < fd)
366 /*-----------------------------------------------------------------*/
368 ClearFd(int fd, OurFDSet *set)
372 /*-----------------------------------------------------------------*/
374 RemoveHeader(char *lower, char *real, int totalSize, char *header)
376 /* returns number of characters removed */
381 sprintf(h2, "\n%s", header);
383 if ((conn = strstr(lower, h2)) == NULL)
387 /* determine how many characters to remove */
388 if ((temp = strchr(conn, '\n')) != NULL)
389 len = (temp - conn) + 1;
391 len = strlen(conn) + 1;
392 start = conn - lower;
394 memmove(&real[start], &real[end], totalSize - end);
395 memmove(&lower[start], &lower[end], totalSize - end);
399 /*-----------------------------------------------------------------*/
401 InsertHeader(char *buf, int totalSize, char *header)
403 /* returns number of bytes inserted */
409 sprintf(h2, "%s\r\n", header);
412 /* if we don't encounter a \n, it means that we have only a single
413 line, and we'd converted the \n to a \0 */
414 if ((temp = strchr(buf, '\n')) == NULL)
415 temp = strchr(buf, '\0');
418 memmove(temp + len, temp, totalSize - (temp - buf));
419 memcpy(temp, h2, len);
423 /*-----------------------------------------------------------------*/
425 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
430 char *buf = fb->fb_buf;
438 if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
441 /* insert client info after first line */
442 sprintf(orig, "X-CoDemux-Client: %s", inet_ntoa(addr));
443 fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
445 /* get just the header, so we can work on it */
446 LOCAL_STR_DUP_LOWER(lowerBuf, buf);
447 if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
448 end = strstr(lowerBuf, "\n\n");
451 /* remove any existing connection, keep-alive headers, add ours */
452 fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "keep-alive:");
453 fb->fb_used -= RemoveHeader(lowerBuf, buf, fb->fb_used + 1, "connection:");
454 fb->fb_used += InsertHeader(buf, fb->fb_used + 1, "Connection: close");
455 InsertHeader(lowerBuf, fb->fb_used + 1, "connection: close");
457 /* isolate host, see if it matches */
458 if ((hostVal = strstr(lowerBuf, "\nhost:")) != NULL) {
460 hostVal += strlen("\nhost:");
461 if ((end = strchr(hostVal, '\n')) != NULL)
463 if ((end = strchr(hostVal, ':')) != NULL)
465 while (isspace(*hostVal))
467 if (strlen(hostVal) > 0) {
468 hostVal = GetWord(hostVal, 0);
469 for (i = 1; i < numServices; i++) {
470 if (serviceSig[i].ss_host != NULL &&
471 DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
474 /* printf("%s", buf); */
483 /* see if URL prefix matches */
484 if ((end = strchr(lowerBuf, '\n')) != NULL)
486 if ((url = GetField(lowerBuf, 1)) == NULL ||
488 /* bad request - let apache handle it ? */
492 url++; /* skip the leading slash */
493 for (i = 1; i < numServices; i++) {
494 if (serviceSig[i].ss_prefix != NULL &&
495 (len = strlen(serviceSig[i].ss_prefix)) > 0 &&
496 strncmp(url, serviceSig[i].ss_prefix, len) == 0 &&
497 (url[len] == ' ' || url[len] == '/')) {
498 int startPos = url - lowerBuf;
499 int stripLen = len + ((url[len] == '/') ? 1 : 0);
500 /* strip out prefix */
501 fb->fb_used -= stripLen;
502 memmove(&buf[startPos], &buf[startPos+stripLen],
503 fb->fb_used + 1 - startPos);
504 /* printf("%s", buf); */
511 /* default to first service */
515 /*-----------------------------------------------------------------*/
517 StartConnect(int origFD, int whichService)
520 struct sockaddr_in dest;
524 if ((sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) {
528 /* make socket non-blocking */
529 if (fcntl(sock, F_SETFL, O_NONBLOCK) < 0) {
534 /* set addr structure */
535 memset(&dest, 0, sizeof(dest));
536 dest.sin_family = AF_INET;
537 dest.sin_port = htons(serviceSig[whichService].ss_port);
538 dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
540 /* start connection process - we should be told that it's in
542 if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 ||
543 errno != EINPROGRESS) {
548 SetFd(sock, &masterWriteSet); /* determine when connect finishes */
549 sockInfo[origFD].si_peerFd = sock;
550 si = &sockInfo[sock];
551 memset(si, 0, sizeof(SockInfo));
552 si->si_peerFd = origFD;
553 si->si_blocked = TRUE; /* still connecting */
554 si->si_whichService = whichService;
555 si->si_writeBuf = sockInfo[origFD].si_readBuf;
556 sockInfo[origFD].si_readBuf->fb_refs++;
557 if (whichService >= 0)
558 SliceConnsInc(whichService);
562 /*-----------------------------------------------------------------*/
564 WriteAvailData(int fd)
566 SockInfo *si = &sockInfo[fd];
567 FlowBuf *fb = si->si_writeBuf;
570 /* printf("trying to write fd %d\n", fd); */
571 if (fb->fb_used < 1 || si->si_blocked)
574 /* printf("trying to write %d bytes\n", fb->fb_used); */
575 /* write(STDOUT_FILENO, fb->fb_buf, fb->fb_used); */
576 if ((res = write(fd, fb->fb_buf, fb->fb_used)) > 0) {
578 if (fb->fb_used > 0) {
579 /* couldn't write all - assume blocked */
580 memmove(fb->fb_buf, &fb->fb_buf[res], fb->fb_used);
581 si->si_blocked = TRUE;
582 SetFd(fd, &masterWriteSet);
584 /* printf("wrote %d\n", res); */
588 /* we might have been full but didn't realize it */
589 if (res == -1 && errno == EAGAIN) {
590 si->si_blocked = TRUE;
591 SetFd(fd, &masterWriteSet);
595 /* otherwise, assume the worst */
598 /*-----------------------------------------------------------------*/
599 static OurFDSet socksToCloseVec;
600 static int numSocksToClose;
601 static int whichSocksToClose[TARG_SETSIZE];
602 /*-----------------------------------------------------------------*/
606 if (FD_ISSET(fd, &socksToCloseVec))
608 SetFd(fd, &socksToCloseVec);
609 whichSocksToClose[numSocksToClose] = fd;
612 /*-----------------------------------------------------------------*/
619 if (buf->fb_refs == 0) {
624 /*-----------------------------------------------------------------*/
626 ReallyCloseSocks(void)
630 memset(&socksToCloseVec, 0, sizeof(socksToCloseVec));
632 for (i = 0; i < numSocksToClose; i++) {
633 int fd = whichSocksToClose[i];
635 DecBuf(sockInfo[fd].si_readBuf);
636 DecBuf(sockInfo[fd].si_writeBuf);
637 ClearFd(fd, &masterReadSet);
638 ClearFd(fd, &masterWriteSet);
639 if (sockInfo[fd].si_needsHeaderSince) {
640 sockInfo[fd].si_needsHeaderSince = 0;
643 if (sockInfo[fd].si_whichService >= 0) {
644 SliceConnsDec(sockInfo[fd].si_whichService);
645 sockInfo[fd].si_whichService = -1;
650 /*-----------------------------------------------------------------*/
652 SocketReadyToRead(int fd)
654 SockInfo *si = &sockInfo[fd];
659 /* if peer is closed, close ourselves */
660 if (si->si_peerFd < 0 && (!si->si_needsHeaderSince)) {
665 if ((fb = si->si_readBuf) == NULL) {
666 fb = si->si_readBuf = calloc(1, sizeof(FlowBuf));
668 if (si->si_peerFd >= 0) {
669 sockInfo[si->si_peerFd].si_writeBuf = fb;
674 if (fb->fb_buf == NULL)
675 fb->fb_buf = malloc(FB_ALLOCSIZE);
677 /* determine read buffer size - if 0, then block reads and return */
678 if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) {
679 if (si->si_needsHeaderSince) {
680 write(fd, err400BadRequest, strlen(err400BadRequest));
685 ClearFd(fd, &masterReadSet);
690 /* read as much as allowed, and is available */
691 if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
693 if (fb->fb_used == 0 && si->si_peerFd >= 0) {
694 CloseSock(si->si_peerFd);
703 if (fb->fb_used == 0 && si->si_peerFd >= 0) {
704 CloseSock(si->si_peerFd);
710 fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
711 printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
713 /* if we need header, check if we've gotten it. if so, do
714 modifications and continue. if not, check if we've read the
715 maximum, and if so, fail */
716 if (si->si_needsHeaderSince) {
720 #define STATUS_REQ "GET /codemux/status.txt"
721 if (strncasecmp(fb->fb_buf, STATUS_REQ, sizeof(STATUS_REQ)-1) == 0) {
727 printf("trying to find service\n");
728 if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
730 printf("found service %d\n", whichService);
731 slice = ServiceToSlice(whichService);
733 /* no service can have more than some absolute max number of
734 connections. Also, when we're too busy, start enforcing
735 fairness across the servers */
736 if (slice->si_numConns > SERVICE_MAX ||
737 (numTotalSliceConns > FAIRNESS_CUTOFF &&
738 slice->si_numConns > MAX_CONNS/numActiveSlices)) {
739 write(fd, err503TooBusy, strlen(err503TooBusy));
744 if (slice->si_xid > 0) {
745 setsockopt(fd, SOL_SOCKET, SO_SETXID,
746 &slice->si_xid, sizeof(slice->si_xid));
747 fprintf(stderr, "setsockopt() with XID = %d name = %s\n",
748 slice->si_xid, slice->si_sliceName);
751 si->si_needsHeaderSince = 0;
753 if (StartConnect(fd, whichService) != SUCCESS) {
754 write(fd, err503Unavailable, strlen(err503Unavailable));
761 /* write anything possible */
762 if (WriteAvailData(si->si_peerFd) != SUCCESS) {
763 /* assume the worst and close */
765 CloseSock(si->si_peerFd);
769 /*-----------------------------------------------------------------*/
771 SocketReadyToWrite(int fd)
773 SockInfo *si = &sockInfo[fd];
775 /* unblock it and read what it has */
776 si->si_blocked = FALSE;
777 ClearFd(fd, &masterWriteSet);
778 SetFd(fd, &masterReadSet);
780 /* enable reading on peer just in case it was off */
781 if (si->si_peerFd >= 0)
782 SetFd(si->si_peerFd, &masterReadSet);
784 /* if we have data, write it */
785 if (WriteAvailData(fd) != SUCCESS) {
786 /* assume the worst and close */
788 if (si->si_peerFd >= 0) {
789 CloseSock(si->si_peerFd);
795 /* if peer is closed and we're done writing, we should close */
796 if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
799 /*-----------------------------------------------------------------*/
801 CloseReqlessConns(void)
803 static int lastSweep;
807 if (lastSweep == now)
811 if (numTotalSliceConns + numNeedingHeaders > MAX_CONNS ||
812 numNeedingHeaders > TARG_SETSIZE/20) {
813 /* second condition is probably an attack - close aggressively */
816 else if (numTotalSliceConns + numNeedingHeaders > FAIRNESS_CUTOFF ||
817 numNeedingHeaders > TARG_SETSIZE/40) {
818 /* sweep a little aggressively */
821 else if (numNeedingHeaders > TARG_SETSIZE/80) {
822 /* just sweep to close strays */
826 /* too little gained - not worth sweeping */
830 /* if it's too old, close it */
831 for (i = 0; i < highestSetFd+1; i++) {
832 if (sockInfo[i].si_needsHeaderSince &&
833 (now - sockInfo[i].si_needsHeaderSince) > maxAge)
837 /*-----------------------------------------------------------------*/
839 MainLoop(int lisSock)
842 OurFDSet tempReadSet, tempWriteSet;
844 int lastConfCheck = 0;
846 signal(SIGPIPE, SIG_IGN);
851 struct timeval timeout;
855 if (now - lastConfCheck > 300) {
857 GetSliceXids(); /* always call - in case new slices created */
861 /* see if there's any activity */
862 tempReadSet = masterReadSet;
863 tempWriteSet = masterWriteSet;
865 /* trim it down if needed */
866 while (highestSetFd > 1 &&
867 (!FD_ISSET(highestSetFd, &tempReadSet)) &&
868 (!FD_ISSET(highestSetFd, &tempWriteSet)))
872 res = select(highestSetFd+1, (fd_set *) &tempReadSet,
873 (fd_set *) &tempWriteSet, NULL, &timeout);
874 if (res < 0 && errno != EINTR) {
881 /* clear the bit for listen socket to avoid confusion */
882 ClearFd(lisSock, &tempReadSet);
884 ceiling = highestSetFd+1; /* copy it, since it changes during loop */
885 /* pass data back and forth as needed */
886 for (i = 0; i < ceiling; i++) {
887 if (FD_ISSET(i, &tempWriteSet))
888 SocketReadyToWrite(i);
890 for (i = 0; i < ceiling; i++) {
891 if (FD_ISSET(i, &tempReadSet))
892 SocketReadyToRead(i);
895 /* see if we need to close conns w/o requests */
901 /* try accepting new connections */
903 struct sockaddr_in addr;
904 socklen_t lenAddr = sizeof(addr);
905 if ((newSock = accept(lisSock, (struct sockaddr *) &addr,
907 memset(&sockInfo[newSock], 0, sizeof(SockInfo));
908 sockInfo[newSock].si_needsHeaderSince = now;
910 sockInfo[newSock].si_peerFd = -1;
911 sockInfo[newSock].si_cliAddr = addr.sin_addr;
912 sockInfo[newSock].si_whichService = -1;
913 SetFd(newSock, &masterReadSet);
915 } while (newSock >= 0);
918 /*-----------------------------------------------------------------*/
920 main(int argc, char *argv[])
924 if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
925 fprintf(stderr, "failed creating accept socket\n");
928 SetFd(lisSock, &masterReadSet);
933 /* this is the parent - just wait */
934 while (wait3(NULL, 0, NULL) < 1)
935 ; /* just keep waiting for a real pid */
944 /*-----------------------------------------------------------------*/