Setting tag codemux-0.1-15
[codemux.git] / codemux.c
index 732059a..0b6233c 100644 (file)
--- a/codemux.c
+++ b/codemux.c
 #include <stdlib.h>
 #include <time.h>
 #include <unistd.h>
-#include "applib.h"
+#include <string.h>
+#include "codemuxlib.h"
+#include "debug.h"
+
+#ifdef DEBUG
+HANDLE hdebugLog;
+int defaultTraceSync;
+#endif
 
 #define CONF_FILE "/etc/codemux/codemux.conf"
 #define DEMUX_PORT 80
-#define REAL_WEBSERVER_CONFLINE "* root 1080"
+#define PIDFILE "/var/run/codemux.pid"
 #define TARG_SETSIZE 4096
 
 /* set aside some small number of fds for us, allow the rest for
@@ -30,6 +37,8 @@
    among them */
 #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
 
+/* codemux version, from Makefile, or specfile */
+#define CODEMUX_VERSION RPM_VERSION
 
 typedef struct FlowBuf {
   int fb_refs;                 /* num refs */
@@ -55,6 +64,7 @@ typedef struct ServiceSig {
   char *ss_host;               /* suffix in host */
   char *ss_slice;
   short ss_port;
+  char *ss_ip;
   int ss_slicePos;             /* position in slices array */
 } ServiceSig;
 
@@ -85,7 +95,8 @@ static int numNeedingHeaders; /* how many conns waiting on headers? */
 
 static int numForks;
 
-HANDLE hdebugLog;
+/* PLC netflow domain name like netflow.planet-lab.org */
+static char* domainNamePLCNetflow = NULL;
 
 #ifndef SO_SETXID
 #define SO_SETXID SO_PEERCRED
@@ -108,8 +119,10 @@ DumpStatus(int fd)
   int len;
 
   sprintf(start, 
+         "CoDemux version %s\n"
          "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
          "numNeedingHeaders %d, anySliceXidsNeeded %d\n",
+         CODEMUX_VERSION,
          numForks, numActiveSlices, numTotalSliceConns,
          numNeedingHeaders, anySliceXidsNeeded);
   start += strlen(start);
@@ -163,14 +176,14 @@ GetSliceXids(void)
     int xid;
 
     if ((temp = strchr(line, ':')) == NULL)
-      continue;                        /* weird line */
+      goto next_line;                  /* weird line */
     *temp = '\0';              /* terminate slice name */
     temp++;
     if ((temp = strchr(temp+1, ':')) == NULL)
-      continue;                        /* weird line */
+      goto next_line;  /* weird line */
     if ((xid = atoi(temp+1)) < 1)
-      continue;                        /* weird xid */
-
+      goto next_line;  /* weird xid */
+    
     /* we've got a slice name and xid, let's try to match */
     for (i = 0; i < numSlices; i++) {
       if (slices[i].si_xid == 0 &&
@@ -179,6 +192,9 @@ GetSliceXids(void)
        break;
       }
     }
+  next_line:
+    if (line)
+      xfree(line);
   }
 
   /* assume service 0 is the root service, and don't check it since
@@ -235,11 +251,11 @@ WhichSlicePos(char *slice)
 
   if (numSlices >= numSlicesAlloc) {
     numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
-    slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo));
+    slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
   }
 
   memset(&slices[numSlices], 0, sizeof(SliceInfo));
-  slices[numSlices].si_sliceName = strdup(slice);
+  slices[numSlices].si_sliceName = xstrdup(slice);
   numSlices++;
   return(numSlices-1);
 }
@@ -279,18 +295,13 @@ ReadConfFile(void)
     ServiceSig serv;
     int port;
     if (line != NULL)
-      free(line);
-
-    /* on the first pass, put in a fake entry for apache */
-    if (num == 0)
-      line = strdup(REAL_WEBSERVER_CONFLINE);
-    else {
-      if ((line = GetNextLine(f)) == NULL)
-       break;
-    }
+      xfree(line);
+    
+    if ((line = GetNextLine(f)) == NULL)
+      break;
 
     memset(&serv, 0, sizeof(serv));
-    if (WordCount(line) != 3) {
+    if (WordCount(line) < 3) {
       fprintf(stderr, "bad line: %s\n", line);
       continue;
     }
@@ -302,9 +313,25 @@ ReadConfFile(void)
 
     serv.ss_host = GetWord(line, 0);
     serv.ss_slice = GetWord(line, 1);
+    serv.ss_ip = GetWord(line, 3);
+
+    if (num == 0) {
+      /* the first row must be an entry for apache */
+      if (strcmp(serv.ss_host, "*") != 0 ||
+         strcmp(serv.ss_slice, "root") != 0) {
+       fprintf(stderr, "first row has to be for webserver\n");
+       exit(-1);
+      }
+      /* see if there's PLC netflow's domain name */
+      if (domainNamePLCNetflow != NULL) {
+       xfree(domainNamePLCNetflow);
+       domainNamePLCNetflow = NULL;
+      }
+      domainNamePLCNetflow = GetWord(line, 3);
+    }
     if (num >= numAlloc) {
       numAlloc = MAX(numAlloc * 2, 8);
-      servs = realloc(servs, numAlloc * sizeof(ServiceSig));
+      servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
     }
     serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
     if (slices[serv.ss_slicePos].si_inUse == 0 &&
@@ -316,6 +343,8 @@ ReadConfFile(void)
 
   fclose(f);
 
+#if 0
+  /* Faiyaz asked me to allow a single-entry codemux conf */
   if (num == 1) {
     if (numServices == 0) {
       fprintf(stderr, "nothing found in codemux.conf\n");
@@ -323,12 +352,18 @@ ReadConfFile(void)
     }
     return;
   }
+#endif
+  if (num < 1) {
+    fprintf(stderr, "no entry found in codemux.conf\n");
+    exit(-1);
+  }
 
   for (i = 0; i < numServices; i++) {
-    free(serviceSig[i].ss_host);
-    free(serviceSig[i].ss_slice);
+    xfree(serviceSig[i].ss_host);
+    xfree(serviceSig[i].ss_ip);
+    xfree(serviceSig[i].ss_slice);
   }
-  free(serviceSig);
+  xfree(serviceSig);
   serviceSig = servs;
   numServices = num;
   confFileReadTime = statBuf.st_mtime;
@@ -425,7 +460,7 @@ static int
 FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
 {
   char *end;
-  char *lowerBuf;
+  char lowerBuf[FB_ALLOCSIZE];
   char *hostVal;
   char *buf = fb->fb_buf;
   char orig[256];
@@ -434,7 +469,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
   int i;
   int len;
 #endif
-    
+
   if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL)
     return(FAILURE);
 
@@ -443,7 +478,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
   fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig);
     
   /* get just the header, so we can work on it */
-  LOCAL_STR_DUP_LOWER(lowerBuf, buf);
+  StrcpyLower(lowerBuf, buf);
   if ((end = strstr(lowerBuf, "\n\r\n")) == NULL)
     end = strstr(lowerBuf, "\n\n");
   *end = '\0';
@@ -471,7 +506,6 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr)
            DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) {
          *whichService = i;
          free(hostVal);
-         /* printf("%s", buf); */
          return(SUCCESS);
        }
       }
@@ -535,8 +569,12 @@ StartConnect(int origFD, int whichService)
   memset(&dest, 0, sizeof(dest));
   dest.sin_family = AF_INET;
   dest.sin_port = htons(serviceSig[whichService].ss_port);
-  dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
-  
+  if (serviceSig[whichService].ss_ip != NULL) {
+       dest.sin_addr.s_addr = inet_addr(serviceSig[whichService].ss_ip);
+  } else {
+       dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
+  }
+
   /* start connection process - we should be told that it's in
      progress */
   if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || 
@@ -644,6 +682,10 @@ ReallyCloseSocks(void)
       SliceConnsDec(sockInfo[fd].si_whichService);
       sockInfo[fd].si_whichService = -1;
     }
+    /* KyoungSoo*/
+    if (sockInfo[fd].si_peerFd >= 0) {
+      sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
+    }
   }
   numSocksToClose = 0;
 }
@@ -663,7 +705,7 @@ SocketReadyToRead(int fd)
   }
 
   if ((fb = si->si_readBuf) == NULL) {
-    fb = si->si_readBuf = calloc(1, sizeof(FlowBuf));
+    fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
     fb->fb_refs = 1;
     if (si->si_peerFd >= 0) {
       sockInfo[si->si_peerFd].si_writeBuf = fb;
@@ -672,10 +714,10 @@ SocketReadyToRead(int fd)
   }
 
   if (fb->fb_buf == NULL)
-    fb->fb_buf = malloc(FB_ALLOCSIZE);
+    fb->fb_buf = xmalloc(FB_ALLOCSIZE);
 
   /* determine read buffer size - if 0, then block reads and return */
-  if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) {
+  if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
     if (si->si_needsHeaderSince) {
       write(fd, err400BadRequest, strlen(err400BadRequest));
       CloseSock(fd);
@@ -685,8 +727,8 @@ SocketReadyToRead(int fd)
       ClearFd(fd, &masterReadSet);
       return;
     }
-  }
-
+  } 
+  
   /* read as much as allowed, and is available */
   if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
     CloseSock(fd);
@@ -699,6 +741,7 @@ SocketReadyToRead(int fd)
   if (res == -1) {
     if (errno == EAGAIN)
       return;
+    TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
     CloseSock(fd);
     if (fb->fb_used == 0 && si->si_peerFd >= 0) {
       CloseSock(si->si_peerFd);
@@ -708,7 +751,7 @@ SocketReadyToRead(int fd)
   }
   fb->fb_used += res;
   fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
-  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
+  //  printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
 
   /* if we need header, check if we've gotten it. if so, do
      modifications and continue. if not, check if we've read the
@@ -724,12 +767,31 @@ SocketReadyToRead(int fd)
       return;
     }
 
-    printf("trying to find service\n");
+    //    printf("trying to find service\n");
     if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
       return;
-    printf("found service %d\n", whichService);
+    //    printf("found service %d\n", whichService);
     slice = ServiceToSlice(whichService);
 
+    /* if it needs to be redirected to PLC, let it be handled here */
+    if (whichService == 0 && domainNamePLCNetflow != NULL &&
+       strcmp(slice->si_sliceName, "root") == 0) {
+      char msg[1024];
+      int len;
+      static const char* resp302 = 
+       "HTTP/1.0 302 Found\r\n"
+       "Location: http://%s\r\n"
+       "Cache-Control: no-cache, no-store\r\n"
+       "Content-type: text/html\r\n"
+       "Connection: close\r\n"
+       "\r\n"
+       "Your request is being redirected to PLC Netflow http://%s\n";
+      len = snprintf(msg, sizeof(msg), resp302, 
+                    domainNamePLCNetflow, domainNamePLCNetflow);
+      write(fd, msg, len);
+      CloseSock(fd);
+      return;
+    }
     /* no service can have more than some absolute max number of
        connections. Also, when we're too busy, start enforcing
        fairness across the servers */
@@ -737,21 +799,28 @@ SocketReadyToRead(int fd)
        (numTotalSliceConns > FAIRNESS_CUTOFF && 
         slice->si_numConns > MAX_CONNS/numActiveSlices)) {
       write(fd, err503TooBusy, strlen(err503TooBusy));
+      TRACE("CloseSock(): fd=%d too busy\n", fd);
       CloseSock(fd);
       return;
     }
 
     if (slice->si_xid > 0) {
+      static int first = 1;
       setsockopt(fd, SOL_SOCKET, SO_SETXID, 
                 &slice->si_xid, sizeof(slice->si_xid));
-      fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
-             slice->si_xid, slice->si_sliceName);
+      if (first) {
+       /* just to log it for once */
+       fprintf(stderr, "setsockopt() with XID = %d name = %s\n", 
+               slice->si_xid, slice->si_sliceName);
+       first = 0;
+      }
     }
 
     si->si_needsHeaderSince = 0;
     numNeedingHeaders--;
     if (StartConnect(fd, whichService) != SUCCESS) {
       write(fd, err503Unavailable, strlen(err503Unavailable));
+      TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
       CloseSock(fd);
       return;
     }
@@ -761,9 +830,13 @@ SocketReadyToRead(int fd)
   /* write anything possible */
   if (WriteAvailData(si->si_peerFd) != SUCCESS) {
     /* assume the worst and close */
+    TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
+         fd, errno, strerror(errno));
     CloseSock(fd);
-    CloseSock(si->si_peerFd);
-    si->si_peerFd = -1;
+    if (si->si_peerFd >=0) {
+      CloseSock(si->si_peerFd);
+      si->si_peerFd = -1;
+    }
   }
 }
 /*-----------------------------------------------------------------*/
@@ -784,6 +857,8 @@ SocketReadyToWrite(int fd)
   /* if we have data, write it */
   if (WriteAvailData(fd) != SUCCESS) {
    /* assume the worst and close */
+    TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", 
+         fd, errno, strerror(errno));
     CloseSock(fd);
     if (si->si_peerFd >= 0) {
       CloseSock(si->si_peerFd);
@@ -793,8 +868,9 @@ SocketReadyToWrite(int fd)
   }
 
   /* if peer is closed and we're done writing, we should close */
-  if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
+  if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
     CloseSock(fd);
+  }
 }
 /*-----------------------------------------------------------------*/
 static void
@@ -830,7 +906,7 @@ CloseReqlessConns(void)
   /* if it's too old, close it */
   for (i = 0; i < highestSetFd+1; i++) {
     if (sockInfo[i].si_needsHeaderSince &&
-       (now - sockInfo[i].si_needsHeaderSince) > maxAge)
+       (now - sockInfo[i].si_needsHeaderSince) > maxAge) 
       CloseSock(i);
   }
 }
@@ -904,6 +980,11 @@ MainLoop(int lisSock)
       socklen_t lenAddr = sizeof(addr);
       if ((newSock = accept(lisSock, (struct sockaddr *) &addr, 
                            &lenAddr)) >= 0) {
+       /* make socket non-blocking */
+       if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
+         close(newSock);
+         continue;
+       }
        memset(&sockInfo[newSock], 0, sizeof(SockInfo));
        sockInfo[newSock].si_needsHeaderSince = now;
        numNeedingHeaders++;
@@ -916,17 +997,123 @@ MainLoop(int lisSock)
   }
 }
 /*-----------------------------------------------------------------*/
+static int 
+InitDaemon(void)
+{
+  pid_t pid;
+  FILE *pidfile;
+  
+  pidfile = fopen(PIDFILE, "w");
+  if (pidfile == NULL) {
+    fprintf(stderr, "%s creation failed\n", PIDFILE);
+    return(-1);
+  }
+
+  if ((pid = fork()) < 0) {
+    fclose(pidfile);
+    return(-1);
+  }
+  else if (pid != 0) {
+    /* i'm the parent, writing down the child pid  */
+    fprintf(pidfile, "%u\n", pid);
+    fclose(pidfile);
+    exit(0);
+  }
+
+  /* close the pid file */
+  fclose(pidfile);
+
+  /* routines for any daemon process
+     1. create a new session 
+     2. change directory to the root
+     3. change the file creation permission 
+  */
+  setsid();
+  chdir("/");
+  umask(0);
+
+  return(0);
+}
+/*-----------------------------------------------------------------*/
+static int
+OpenLogFile(void)
+{
+  static const char* logfile = "/var/log/codemux.log";
+  int logfd;
+
+  logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
+  if (logfd < 0) {
+    fprintf(stderr, "cannot open the logfile err=%s\n",
+           strerror(errno));
+    exit(-1);
+  }
+
+  /* duplicate logfile to stderr */
+  if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
+    fprintf(stderr, "cannot open the logfile err=%s\n",
+           strerror(errno));
+    exit(-1);
+  }
+  
+  /* set the close-on-exec flag */
+  if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
+    fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
+           strerror(errno));
+    exit(-1);
+  }
+
+  return logfd;
+}
+/*-----------------------------------------------------------------*/
 int
 main(int argc, char *argv[])
 {
   int lisSock;
+  int logFd;
+  int doDaemon = 1;
+  int opt;
+  struct in_addr lisAddress = { .s_addr = htonl(INADDR_ANY) };
+
+  while ((opt = getopt(argc, argv, "dl:")) != -1) {
+    switch (opt) {
+      case 'd':
+       doDaemon = 0;
+       break;
+      case 'l':
+       if (inet_pton(AF_INET, optarg, &lisAddress) <= 0) {
+         fprintf(stderr, "`%s' is not a valid address\n", optarg);
+         exit(-1);
+       }
+       break;
+      default:
+       fprintf(stderr, "Usage: %s [-d] [-l <listening address>]\n", argv[0]);
+       exit(-1);
+    }
+  }
 
-  if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
+  /* do the daemon stuff */
+  if (doDaemon) {
+    if (InitDaemon() < 0) {
+      fprintf(stderr, "codemux daemon_init() failed\n");
+      exit(-1);
+    }
+  }
+
+  /* create the accept socket */
+  if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE,
+                                          &lisAddress)) < 0) {
     fprintf(stderr, "failed creating accept socket\n");
     exit(-1);
   }
   SetFd(lisSock, &masterReadSet);
 
+  /* open the log file */
+  logFd = OpenLogFile();
+
+
+  /* write down the version */
+  fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
+
   while (1) {
     numForks++;
     if (fork()) {