#include <stdlib.h>
#include <time.h>
#include <unistd.h>
-#include "applib.h"
+#include <string.h>
+#include "codemuxlib.h"
+#include "debug.h"
+
+#ifdef DEBUG
+HANDLE hdebugLog;
+int defaultTraceSync;
+#endif
#define CONF_FILE "/etc/codemux/codemux.conf"
#define DEMUX_PORT 80
-#define REAL_WEBSERVER_CONFLINE "* root 1080"
+#define PIDFILE "/var/run/codemux.pid"
#define TARG_SETSIZE 4096
/* set aside some small number of fds for us, allow the rest for
among them */
#define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
+/* codemux version */
+#define CODEMUX_VERSION "0.4"
typedef struct FlowBuf {
int fb_refs; /* num refs */
static int numForks;
-HANDLE hdebugLog;
+/* PLC netflow domain name like netflow.planet-lab.org */
+static char* domainNamePLCNetflow = NULL;
#ifndef SO_SETXID
#define SO_SETXID SO_PEERCRED
int len;
sprintf(start,
+ "CoDemux version %s\n"
"numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
"numNeedingHeaders %d, anySliceXidsNeeded %d\n",
+ CODEMUX_VERSION,
numForks, numActiveSlices, numTotalSliceConns,
numNeedingHeaders, anySliceXidsNeeded);
start += strlen(start);
int xid;
if ((temp = strchr(line, ':')) == NULL)
- continue; /* weird line */
+ goto next_line; /* weird line */
*temp = '\0'; /* terminate slice name */
temp++;
if ((temp = strchr(temp+1, ':')) == NULL)
- continue; /* weird line */
+ goto next_line; /* weird line */
if ((xid = atoi(temp+1)) < 1)
- continue; /* weird xid */
-
+ goto next_line; /* weird xid */
+
/* we've got a slice name and xid, let's try to match */
for (i = 0; i < numSlices; i++) {
if (slices[i].si_xid == 0 &&
break;
}
}
+ next_line:
+ if (line)
+ xfree(line);
}
/* assume service 0 is the root service, and don't check it since
if (numSlices >= numSlicesAlloc) {
numSlicesAlloc = MAX(8, numSlicesAlloc * 2);
- slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo));
+ slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo));
}
memset(&slices[numSlices], 0, sizeof(SliceInfo));
- slices[numSlices].si_sliceName = strdup(slice);
+ slices[numSlices].si_sliceName = xstrdup(slice);
numSlices++;
return(numSlices-1);
}
ServiceSig serv;
int port;
if (line != NULL)
- free(line);
-
- /* on the first pass, put in a fake entry for apache */
- if (num == 0)
- line = strdup(REAL_WEBSERVER_CONFLINE);
- else {
- if ((line = GetNextLine(f)) == NULL)
- break;
- }
+ xfree(line);
+
+ if ((line = GetNextLine(f)) == NULL)
+ break;
memset(&serv, 0, sizeof(serv));
- if (WordCount(line) != 3) {
+ if (WordCount(line) < 3) {
fprintf(stderr, "bad line: %s\n", line);
continue;
}
serv.ss_host = GetWord(line, 0);
serv.ss_slice = GetWord(line, 1);
+
+ if (num == 0) {
+ /* the first row must be an entry for apache */
+ if (strcmp(serv.ss_host, "*") != 0 ||
+ strcmp(serv.ss_slice, "root") != 0) {
+ fprintf(stderr, "first row has to be for webserver\n");
+ exit(-1);
+ }
+ /* see if there's PLC netflow's domain name */
+ if (domainNamePLCNetflow != NULL) {
+ xfree(domainNamePLCNetflow);
+ domainNamePLCNetflow = NULL;
+ }
+ domainNamePLCNetflow = GetWord(line, 3);
+ }
if (num >= numAlloc) {
numAlloc = MAX(numAlloc * 2, 8);
- servs = realloc(servs, numAlloc * sizeof(ServiceSig));
+ servs = xrealloc(servs, numAlloc * sizeof(ServiceSig));
}
serv.ss_slicePos = WhichSlicePos(serv.ss_slice);
if (slices[serv.ss_slicePos].si_inUse == 0 &&
fclose(f);
+#if 0
+ /* Faiyaz asked me to allow a single-entry codemux conf */
if (num == 1) {
if (numServices == 0) {
fprintf(stderr, "nothing found in codemux.conf\n");
}
return;
}
+#endif
+ if (num < 1) {
+ fprintf(stderr, "no entry found in codemux.conf\n");
+ exit(-1);
+ }
for (i = 0; i < numServices; i++) {
- free(serviceSig[i].ss_host);
- free(serviceSig[i].ss_slice);
+ xfree(serviceSig[i].ss_host);
+ xfree(serviceSig[i].ss_slice);
}
- free(serviceSig);
+ xfree(serviceSig);
serviceSig = servs;
numServices = num;
confFileReadTime = statBuf.st_mtime;
SliceConnsDec(sockInfo[fd].si_whichService);
sockInfo[fd].si_whichService = -1;
}
+ /* KyoungSoo*/
+ if (sockInfo[fd].si_peerFd >= 0) {
+ sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
+ }
}
numSocksToClose = 0;
}
}
if ((fb = si->si_readBuf) == NULL) {
- fb = si->si_readBuf = calloc(1, sizeof(FlowBuf));
+ fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf));
fb->fb_refs = 1;
if (si->si_peerFd >= 0) {
sockInfo[si->si_peerFd].si_writeBuf = fb;
}
if (fb->fb_buf == NULL)
- fb->fb_buf = malloc(FB_ALLOCSIZE);
+ fb->fb_buf = xmalloc(FB_ALLOCSIZE);
/* determine read buffer size - if 0, then block reads and return */
- if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) {
+ if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) {
if (si->si_needsHeaderSince) {
write(fd, err400BadRequest, strlen(err400BadRequest));
CloseSock(fd);
ClearFd(fd, &masterReadSet);
return;
}
- }
-
+ }
+
/* read as much as allowed, and is available */
if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
CloseSock(fd);
if (res == -1) {
if (errno == EAGAIN)
return;
+ TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
CloseSock(fd);
if (fb->fb_used == 0 && si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
}
fb->fb_used += res;
fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
- printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
+ // printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
/* if we need header, check if we've gotten it. if so, do
modifications and continue. if not, check if we've read the
return;
}
- printf("trying to find service\n");
+ // printf("trying to find service\n");
if (FindService(fb, &whichService, si->si_cliAddr) != SUCCESS)
return;
- printf("found service %d\n", whichService);
+ // printf("found service %d\n", whichService);
slice = ServiceToSlice(whichService);
+ /* if it needs to be redirected to PLC, let it be handled here */
+ if (whichService == 0 && domainNamePLCNetflow != NULL &&
+ strcmp(slice->si_sliceName, "root") == 0) {
+ char msg[1024];
+ int len;
+ static const char* resp302 =
+ "HTTP/1.0 302 Found\r\n"
+ "Location: http://%s\r\n"
+ "Cache-Control: no-cache, no-store\r\n"
+ "Content-type: text/html\r\n"
+ "Connection: close\r\n"
+ "\r\n"
+ "Your request is being redirected to PLC Netflow http://%s\n";
+ len = snprintf(msg, sizeof(msg), resp302,
+ domainNamePLCNetflow, domainNamePLCNetflow);
+ write(fd, msg, len);
+ CloseSock(fd);
+ return;
+ }
/* no service can have more than some absolute max number of
connections. Also, when we're too busy, start enforcing
fairness across the servers */
(numTotalSliceConns > FAIRNESS_CUTOFF &&
slice->si_numConns > MAX_CONNS/numActiveSlices)) {
write(fd, err503TooBusy, strlen(err503TooBusy));
+ TRACE("CloseSock(): fd=%d too busy\n", fd);
CloseSock(fd);
return;
}
if (slice->si_xid > 0) {
+ static int first = 1;
setsockopt(fd, SOL_SOCKET, SO_SETXID,
&slice->si_xid, sizeof(slice->si_xid));
- fprintf(stderr, "setsockopt() with XID = %d name = %s\n",
- slice->si_xid, slice->si_sliceName);
+ if (first) {
+ /* just to log it for once */
+ fprintf(stderr, "setsockopt() with XID = %d name = %s\n",
+ slice->si_xid, slice->si_sliceName);
+ first = 0;
+ }
}
si->si_needsHeaderSince = 0;
numNeedingHeaders--;
if (StartConnect(fd, whichService) != SUCCESS) {
write(fd, err503Unavailable, strlen(err503Unavailable));
+ TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
CloseSock(fd);
return;
}
/* write anything possible */
if (WriteAvailData(si->si_peerFd) != SUCCESS) {
/* assume the worst and close */
+ TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n",
+ fd, errno, strerror(errno));
CloseSock(fd);
- CloseSock(si->si_peerFd);
- si->si_peerFd = -1;
+ if (si->si_peerFd >=0) {
+ CloseSock(si->si_peerFd);
+ si->si_peerFd = -1;
+ }
}
}
/*-----------------------------------------------------------------*/
/* if we have data, write it */
if (WriteAvailData(fd) != SUCCESS) {
/* assume the worst and close */
+ TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n",
+ fd, errno, strerror(errno));
CloseSock(fd);
if (si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
}
/* if peer is closed and we're done writing, we should close */
- if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
+ if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) {
CloseSock(fd);
+ }
}
/*-----------------------------------------------------------------*/
static void
/* if it's too old, close it */
for (i = 0; i < highestSetFd+1; i++) {
if (sockInfo[i].si_needsHeaderSince &&
- (now - sockInfo[i].si_needsHeaderSince) > maxAge)
+ (now - sockInfo[i].si_needsHeaderSince) > maxAge)
CloseSock(i);
}
}
socklen_t lenAddr = sizeof(addr);
if ((newSock = accept(lisSock, (struct sockaddr *) &addr,
&lenAddr)) >= 0) {
+ /* make socket non-blocking */
+ if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
+ close(newSock);
+ continue;
+ }
memset(&sockInfo[newSock], 0, sizeof(SockInfo));
sockInfo[newSock].si_needsHeaderSince = now;
numNeedingHeaders++;
}
}
/*-----------------------------------------------------------------*/
+static int
+InitDaemon(void)
+{
+ pid_t pid;
+ FILE *pidfile;
+
+ pidfile = fopen(PIDFILE, "w");
+ if (pidfile == NULL) {
+ fprintf(stderr, "%s creation failed\n", PIDFILE);
+ return(-1);
+ }
+
+ if ((pid = fork()) < 0) {
+ fclose(pidfile);
+ return(-1);
+ }
+ else if (pid != 0) {
+ /* i'm the parent, writing down the child pid */
+ fprintf(pidfile, "%u\n", pid);
+ fclose(pidfile);
+ exit(0);
+ }
+
+ /* close the pid file */
+ fclose(pidfile);
+
+ /* routines for any daemon process
+ 1. create a new session
+ 2. change directory to the root
+ 3. change the file creation permission
+ */
+ setsid();
+ chdir("/");
+ umask(0);
+
+ return(0);
+}
+/*-----------------------------------------------------------------*/
+static int
+OpenLogFile(void)
+{
+ static const char* logfile = "/var/log/codemux.log";
+ static const char* oldlogfile = "/var/log/codemux.log.old";
+ int logfd;
+
+ /* if the previous log file exists,
+ rename it to the oldlogfile */
+ if (access(logfile, F_OK) == 0) {
+ if (rename(logfile, oldlogfile) < 0) {
+ fprintf(stderr, "cannot rotate the logfile err=%s\n",
+ strerror(errno));
+ exit(-1);
+ }
+ }
+
+ logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600);
+ if (logfd < 0) {
+ fprintf(stderr, "cannot open the logfile err=%s\n",
+ strerror(errno));
+ exit(-1);
+ }
+
+ /* duplicate logfile to stderr */
+ if (dup2(logfd, STDERR_FILENO) != STDERR_FILENO) {
+ fprintf(stderr, "cannot open the logfile err=%s\n",
+ strerror(errno));
+ exit(-1);
+ }
+
+ /* set the close-on-exec flag */
+ if (fcntl(STDERR_FILENO, F_SETFD, 1) != 0) {
+ fprintf(stderr, "fcntl to set the close-on-exec flag failed err=%s\n",
+ strerror(errno));
+ exit(-1);
+ }
+
+ return logfd;
+}
+/*-----------------------------------------------------------------*/
int
main(int argc, char *argv[])
{
int lisSock;
+ int logFd;
+
+ /* do the daemon stuff */
+ if (argc <= 1 || strcmp(argv[1], "-d") != 0) {
+ if (InitDaemon() < 0) {
+ fprintf(stderr, "codemux daemon_init() failed\n");
+ exit(-1);
+ }
+ }
+ /* create the accept socket */
if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) {
fprintf(stderr, "failed creating accept socket\n");
exit(-1);
}
SetFd(lisSock, &masterReadSet);
+ /* open the log file */
+ logFd = OpenLogFile();
+
+
+ /* write down the version */
+ fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
+
while (1) {
numForks++;
if (fork()) {