Setting tag codemux-0.1-15
[codemux.git] / codemux.c
index 3ca9ba0..0b6233c 100644 (file)
--- a/codemux.c
+++ b/codemux.c
 #include <unistd.h>
 #include <string.h>
 #include "codemuxlib.h"
+#include "debug.h"
+
+#ifdef DEBUG
+HANDLE hdebugLog;
+int defaultTraceSync;
+#endif
 
 #define CONF_FILE "/etc/codemux/codemux.conf"
 #define DEMUX_PORT 80
@@ -31,6 +37,8 @@
    among them */
 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
 
+/* codemux version, from Makefile, or specfile */
+#define CODEMUX_VERSION RPM_VERSION
 
 typedef struct FlowBuf {
   int fb_refs;                 /* num refs */
@@ -56,6 +64,7 @@ typedef struct ServiceSig {
   char *ss_host;               /* suffix in host */
   char *ss_slice;
   short ss_port;
+  char *ss_ip;
   int ss_slicePos;             /* position in slices array */
 } ServiceSig;
 
@@ -86,6 +95,9 @@ static int numNeedingHeaders; /* how many conns waiting on headers? */
 
 static int numForks;
 
+/* PLC netflow domain name like netflow.planet-lab.org */
+static char* domainNamePLCNetflow = NULL;
+
 #ifndef SO_SETXID
 #define SO_SETXID SO_PEERCRED
 #endif
@@ -107,8 +119,10 @@ DumpStatus(int fd)
   int len;
 
   sprintf(start, 
+         "CoDemux version %s\n"
          "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
          "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
+         CODEMUX_VERSION,
          numForks, numActiveSlices, numTotalSliceConns,
          numNeedingHeaders, anySliceXidsNeeded);
   start += strlen(start);
@@ -162,14 +176,14 @@ GetSliceXids(void)
     int xid;
 
     if ((temp = strchr(line, ':')) == NULL)
-      continue;                        /* weird line */
+      goto next_line;                  /* weird line */
     *temp = '\0';              /* terminate slice name */
     temp++;
     if ((temp = strchr(temp+1, ':')) == NULL)
-      continue;                        /* weird line */
+      goto next_line;  /* weird line */
     if ((xid = atoi(temp+1)) < 1)
-      continue;                        /* weird xid */
-
+      goto next_line;  /* weird xid */
+    
     /* we've got a slice name and xid, let's try to match */
     for (i = 0; i < numSlices; i++) {
       if (slices[i].si_xid == 0 &&
@@ -178,6 +192,9 @@ GetSliceXids(void)
        break;
       }
     }
+  next_line:
+    if (line)
+      xfree(line);
   }
 
   /* assume service 0 is the root service, and don't check it since
@@ -234,11 +251,11 @@ WhichSlicePos(char *slice)
 
   if (numSlices >= numSlicesAlloc) {
     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
-    slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo));
+    slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
   }
 
   memset(&slices[numSlices], 0, sizeof(SliceInfo));
-  slices[numSlices].si_sliceName = strdup(slice);
+  slices[numSlices].si_sliceName = xstrdup(slice);
   numSlices++;
   return(numSlices-1);
 }
@@ -278,13 +295,13 @@ ReadConfFile(void)
     ServiceSig serv;
     int port;
     if (line != NULL)
-      free(line);
+      xfree(line);
     
     if ((line = GetNextLine(f)) == NULL)
       break;
 
     memset(&serv, 0, sizeof(serv));
-    if (WordCount(line) != 3) {
+    if (WordCount(line) < 3) {
       fprintf(stderr, "bad line: %s\n", line);
       continue;
     }
@@ -296,16 +313,25 @@ ReadConfFile(void)
 
     serv.ss_host = GetWord(line, 0);
     serv.ss_slice = GetWord(line, 1);
-
-    if (num == 0 && /* the first row must be an entry for apache */
-       (strcmp(serv.ss_host, "*") != 0 ||
-        strcmp(serv.ss_slice, "root") != 0)) {
-      fprintf(stderr, "first row has to be for webserver\n");
-      exit(-1);
+    serv.ss_ip = GetWord(line, 3);
+
+    if (num == 0) {
+      /* the first row must be an entry for apache */
+      if (strcmp(serv.ss_host, "*") != 0 ||
+         strcmp(serv.ss_slice, "root") != 0) {
+       fprintf(stderr, "first row has to be for webserver\n");
+       exit(-1);
+      }
+      /* see if there's PLC netflow's domain name */
+      if (domainNamePLCNetflow != NULL) {
+       xfree(domainNamePLCNetflow);
+       domainNamePLCNetflow = NULL;
+      }
+      domainNamePLCNetflow = GetWord(line, 3);
     }
     if (num >= numAlloc) {
       numAlloc = MAX(numAlloc * 2, 8);
-      servs = realloc(servs, numAlloc * sizeof(ServiceSig));
+      servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
     }
     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
     if (slices[serv.ss_slicePos].si_inUse == 0 &&
@@ -317,6 +343,8 @@ ReadConfFile(void)
 
   fclose(f);
 
+#if 0
+  /* Faiyaz asked me to allow a single-entry codemux conf */
   if (num == 1) {
     if (numServices == 0) {
       fprintf(stderr, "nothing found in codemux.conf\n");
@@ -324,12 +352,18 @@ ReadConfFile(void)
     }
     return;
   }
+#endif
+  if (num < 1) {
+    fprintf(stderr, "no entry found in codemux.conf\n");
+    exit(-1);
+  }
 
   for (i = 0; i < numServices; i++) {
-    free(serviceSig[i].ss_host);
-    free(serviceSig[i].ss_slice);
+    xfree(serviceSig[i].ss_host);
+    xfree(serviceSig[i].ss_ip);
+    xfree(serviceSig[i].ss_slice);
   }
-  free(serviceSig);
+  xfree(serviceSig);
   serviceSig = servs;
   numServices = num;
   confFileReadTime = statBuf.st_mtime;
@@ -426,7 +460,7 @@ static int
 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
 {
   char *end;
-  char *lowerBuf;
+  char lowerBuf[FB_ALLOCSIZE];
   char *hostVal;
   char *buf = fb->fb_buf;
   char orig[256];
@@ -435,7 +469,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
   int i;
   int len;
 #endif
-    
+
   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
     return(FAILURE);
 
@@ -444,7 +478,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
     
   /* get just the header, so we can work on it */
-  LOCAL_STR_DUP_LOWER(lowerBuf, buf);
+  StrcpyLower(lowerBuf, buf);
   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
     end = strstr(lowerBuf, "\n\n");
   *end = '\0';
@@ -472,7 +506,6 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
            DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
          *whichService = i;
          free(hostVal);
-         /* printf("%s", buf); */
          return(SUCCESS);
        }
       }
@@ -536,8 +569,12 @@ StartConnect(int origFD, int whichService)
   memset(&dest, 0, sizeof(dest));
   dest.sin_family = AF_INET;
   dest.sin_port = htons(serviceSig[whichService].ss_port);
-  dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-  
+  if (serviceSig[whichService].ss_ip != NULL) {
+       dest.sin_addr.s_addr = inet_addr(serviceSig[whichService].ss_ip);
+  } else {
+       dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+  }
+
   /* start connection process - we should be told that it's in
      progress */
   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
@@ -645,6 +682,10 @@ ReallyCloseSocks(void)
       SliceConnsDec(sockInfo[fd].si_whichService);
       sockInfo[fd].si_whichService = -1;
     }
+    /* KyoungSoo*/
+    if (sockInfo[fd].si_peerFd >= 0) {
+      sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
+    }
   }
   numSocksToClose = 0;
 }
@@ -664,7 +705,7 @@ SocketReadyToRead(int fd)
   }
 
   if ((fb = si->si_readBuf) == NULL) {
-    fb = si->si_readBuf = calloc(1, sizeof(FlowBuf));
+    fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
     fb->fb_refs = 1;
     if (si->si_peerFd >= 0) {
       sockInfo[si->si_peerFd].si_writeBuf = fb;
@@ -673,10 +714,10 @@ SocketReadyToRead(int fd)
   }
 
   if (fb->fb_buf == NULL)
-    fb->fb_buf = malloc(FB_ALLOCSIZE);
+    fb->fb_buf = xmalloc(FB_ALLOCSIZE);
 
   /* determine read buffer size - if 0, then block reads and return */
-  if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) {
+  if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
     if (si->si_needsHeaderSince) {
       write(fd, err400BadRequest, strlen(err400BadRequest));
       CloseSock(fd);
@@ -686,8 +727,8 @@ SocketReadyToRead(int fd)
       ClearFd(fd, &masterReadSet);
       return;
     }
-  }
-
+  } 
+  
   /* read as much as allowed, and is available */
   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
     CloseSock(fd);
@@ -700,6 +741,7 @@ SocketReadyToRead(int fd)
   if (res == -1) {
     if (errno == EAGAIN)
       return;
+    TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
     CloseSock(fd);
     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
       CloseSock(si->si_peerFd);
@@ -709,7 +751,7 @@ SocketReadyToRead(int fd)
   }
   fb->fb_used += res;
   fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
-  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
+  //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
 
   /* if we need header, check if we've gotten it. if so, do
      modifications and continue. if not, check if we've read the
@@ -731,6 +773,25 @@ SocketReadyToRead(int fd)
     //    printf("found service %d\n", whichService);
     slice = ServiceToSlice(whichService);
 
+    /* if it needs to be redirected to PLC, let it be handled here */
+    if (whichService == 0 && domainNamePLCNetflow != NULL &&
+       strcmp(slice->si_sliceName, "root") == 0) {
+      char msg[1024];
+      int len;
+      static const char* resp302 = 
+       "HTTP/1.0 302 Found\r\n"
+       "Location: http://%s\r\n"
+       "Cache-Control: no-cache, no-store\r\n"
+       "Content-type: text/html\r\n"
+       "Connection: close\r\n"
+       "\r\n"
+       "Your request is being redirected to PLC Netflow http://%s\n";
+      len = snprintf(msg, sizeof(msg), resp302, 
+                    domainNamePLCNetflow, domainNamePLCNetflow);
+      write(fd, msg, len);
+      CloseSock(fd);
+      return;
+    }
     /* no service can have more than some absolute max number of
        connections. Also, when we're too busy, start enforcing
        fairness across the servers */
@@ -738,6 +799,7 @@ SocketReadyToRead(int fd)
        (numTotalSliceConns > FAIRNESS_CUTOFF && 
         slice->si_numConns > MAX_CONNS/numActiveSlices)) {
       write(fd, err503TooBusy, strlen(err503TooBusy));
+      TRACE("CloseSock(): fd=%d too busy\n", fd);
       CloseSock(fd);
       return;
     }
@@ -758,6 +820,7 @@ SocketReadyToRead(int fd)
     numNeedingHeaders--;
     if (StartConnect(fd, whichService) != SUCCESS) {
       write(fd, err503Unavailable, strlen(err503Unavailable));
+      TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
       CloseSock(fd);
       return;
     }
@@ -767,9 +830,13 @@ SocketReadyToRead(int fd)
   /* write anything possible */
   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
     /* assume the worst and close */
+    TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
+         fd, errno, strerror(errno));
     CloseSock(fd);
-    CloseSock(si->si_peerFd);
-    si->si_peerFd = -1;
+    if (si->si_peerFd >=0) {
+      CloseSock(si->si_peerFd);
+      si->si_peerFd = -1;
+    }
   }
 }
 /*-----------------------------------------------------------------*/
@@ -790,6 +857,8 @@ SocketReadyToWrite(int fd)
   /* if we have data, write it */
   if (WriteAvailData(fd) != SUCCESS) {
    /* assume the worst and close */
+    TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
+         fd, errno, strerror(errno));
     CloseSock(fd);
     if (si->si_peerFd >= 0) {
       CloseSock(si->si_peerFd);
@@ -799,8 +868,9 @@ SocketReadyToWrite(int fd)
   }
 
   /* if peer is closed and we're done writing, we should close */
-  if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
+  if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
     CloseSock(fd);
+  }
 }
 /*-----------------------------------------------------------------*/
 static void
@@ -836,7 +906,7 @@ CloseReqlessConns(void)
   /* if it's too old, close it */
   for (i = 0; i < highestSetFd+1; i++) {
     if (sockInfo[i].si_needsHeaderSince &&
-       (now - sockInfo[i].si_needsHeaderSince) > maxAge)
+       (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
       CloseSock(i);
   }
 }
@@ -910,6 +980,11 @@ MainLoop(int lisSock)
       socklen_t lenAddr = sizeof(addr);
       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
                            &lenAddr)) >= 0) {
+       /* make socket non-blocking */
+       if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
+         close(newSock);
+         continue;
+       }
        memset(&sockInfo[newSock], 0, sizeof(SockInfo));
        sockInfo[newSock].si_needsHeaderSince = now;
        numNeedingHeaders++;
@@ -964,19 +1039,8 @@ static int
 OpenLogFile(void)
 {
   static const char* logfile = "/var/log/codemux.log";
-  static const char* oldlogfile = "/var/log/codemux.log.old";
   int logfd;
 
-  /* if the previous log file exists,
-     rename it to the oldlogfile */
-  if (access(logfile, F_OK) == 0) {
-    if (rename(logfile, oldlogfile) < 0) {
-      fprintf(stderr, "cannot rotate the logfile err=%s\n",
-             strerror(errno));
-      exit(-1);
-    }
-  }
-
   logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
   if (logfd < 0) {
     fprintf(stderr, "cannot open the logfile err=%s\n",
@@ -1006,9 +1070,29 @@ main(int argc, char *argv[])
 {
   int lisSock;
   int logFd;
+  int doDaemon = 1;
+  int opt;
+  struct in_addr lisAddress = { .s_addr = htonl(INADDR_ANY) };
+
+  while ((opt = getopt(argc, argv, "dl:")) != -1) {
+    switch (opt) {
+      case 'd':
+       doDaemon = 0;
+       break;
+      case 'l':
+       if (inet_pton(AF_INET, optarg, &lisAddress) <= 0) {
+         fprintf(stderr, "`%s' is not a valid address\n", optarg);
+         exit(-1);
+       }
+       break;
+      default:
+       fprintf(stderr, "Usage: %s [-d] [-l <listening address>]\n", argv[0]);
+       exit(-1);
+    }
+  }
 
   /* do the daemon stuff */
-  if (argc <= 1 || strcmp(argv[1], "-d") != 0) {
+  if (doDaemon) {
     if (InitDaemon() < 0) {
       fprintf(stderr, "codemux daemon_init() failed\n");
       exit(-1);
@@ -1016,7 +1100,8 @@ main(int argc, char *argv[])
   }
 
   /* create the accept socket */
-  if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
+  if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE,
+                                          &lisAddress)) < 0) {
     fprintf(stderr, "failed creating accept socket\n");
     exit(-1);
   }
@@ -1025,6 +1110,10 @@ main(int argc, char *argv[])
   /* open the log file */
   logFd = OpenLogFile();
 
+
+  /* write down the version */
+  fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
+
   while (1) {
     numForks++;
     if (fork()) {