X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=codemux.c;h=0b6233c2ff3ff8dd71504d08a0dc9d05d16afcbb;hb=270f61bc47809f5ebb4ad1d169a8bf146c5d559a;hp=3ca9ba0570f3ffc200d06b3fc58e7b15c6bb7d34;hpb=3238906f14511128dfbf6512e8bbaa2332ff9f36;p=codemux.git diff --git a/codemux.c b/codemux.c index 3ca9ba0..0b6233c 100644 --- a/codemux.c +++ b/codemux.c @@ -14,6 +14,12 @@ #include #include #include "codemuxlib.h" +#include "debug.h" + +#ifdef DEBUG +HANDLE hdebugLog; +int defaultTraceSync; +#endif #define CONF_FILE "/etc/codemux/codemux.conf" #define DEMUX_PORT 80 @@ -31,6 +37,8 @@ among them */ #define FAIRNESS_CUTOFF (MAX_CONNS * 0.85) +/* codemux version, from Makefile, or specfile */ +#define CODEMUX_VERSION RPM_VERSION typedef struct FlowBuf { int fb_refs; /* num refs */ @@ -56,6 +64,7 @@ typedef struct ServiceSig { char *ss_host; /* suffix in host */ char *ss_slice; short ss_port; + char *ss_ip; int ss_slicePos; /* position in slices array */ } ServiceSig; @@ -86,6 +95,9 @@ static int numNeedingHeaders; /* how many conns waiting on headers? */ static int numForks; +/* PLC netflow domain name like netflow.planet-lab.org */ +static char* domainNamePLCNetflow = NULL; + #ifndef SO_SETXID #define SO_SETXID SO_PEERCRED #endif @@ -107,8 +119,10 @@ DumpStatus(int fd) int len; sprintf(start, + "CoDemux version %s\n" "numForks %d, numActiveSlices %d, numTotalSliceConns %d\n" "numNeedingHeaders %d, anySliceXidsNeeded %d\n", + CODEMUX_VERSION, numForks, numActiveSlices, numTotalSliceConns, numNeedingHeaders, anySliceXidsNeeded); start += strlen(start); @@ -162,14 +176,14 @@ GetSliceXids(void) int xid; if ((temp = strchr(line, ':')) == NULL) - continue; /* weird line */ + goto next_line; /* weird line */ *temp = '\0'; /* terminate slice name */ temp++; if ((temp = strchr(temp+1, ':')) == NULL) - continue; /* weird line */ + goto next_line; /* weird line */ if ((xid = atoi(temp+1)) < 1) - continue; /* weird xid */ - + goto next_line; /* weird xid */ + /* we've got a slice name and xid, let's try to match */ for (i = 0; i < numSlices; i++) { if (slices[i].si_xid == 0 && @@ -178,6 +192,9 @@ GetSliceXids(void) break; } } + next_line: + if (line) + xfree(line); } /* assume service 0 is the root service, and don't check it since @@ -234,11 +251,11 @@ WhichSlicePos(char *slice) if (numSlices >= numSlicesAlloc) { numSlicesAlloc = MAX(8, numSlicesAlloc * 2); - slices = realloc(slices, numSlicesAlloc * sizeof(SliceInfo)); + slices = xrealloc(slices, numSlicesAlloc * sizeof(SliceInfo)); } memset(&slices[numSlices], 0, sizeof(SliceInfo)); - slices[numSlices].si_sliceName = strdup(slice); + slices[numSlices].si_sliceName = xstrdup(slice); numSlices++; return(numSlices-1); } @@ -278,13 +295,13 @@ ReadConfFile(void) ServiceSig serv; int port; if (line != NULL) - free(line); + xfree(line); if ((line = GetNextLine(f)) == NULL) break; memset(&serv, 0, sizeof(serv)); - if (WordCount(line) != 3) { + if (WordCount(line) < 3) { fprintf(stderr, "bad line: %s\n", line); continue; } @@ -296,16 +313,25 @@ ReadConfFile(void) serv.ss_host = GetWord(line, 0); serv.ss_slice = GetWord(line, 1); - - if (num == 0 && /* the first row must be an entry for apache */ - (strcmp(serv.ss_host, "*") != 0 || - strcmp(serv.ss_slice, "root") != 0)) { - fprintf(stderr, "first row has to be for webserver\n"); - exit(-1); + serv.ss_ip = GetWord(line, 3); + + if (num == 0) { + /* the first row must be an entry for apache */ + if (strcmp(serv.ss_host, "*") != 0 || + strcmp(serv.ss_slice, "root") != 0) { + fprintf(stderr, "first row has to be for webserver\n"); + exit(-1); + } + /* see if there's PLC netflow's domain name */ + if (domainNamePLCNetflow != NULL) { + xfree(domainNamePLCNetflow); + domainNamePLCNetflow = NULL; + } + domainNamePLCNetflow = GetWord(line, 3); } if (num >= numAlloc) { numAlloc = MAX(numAlloc * 2, 8); - servs = realloc(servs, numAlloc * sizeof(ServiceSig)); + servs = xrealloc(servs, numAlloc * sizeof(ServiceSig)); } serv.ss_slicePos = WhichSlicePos(serv.ss_slice); if (slices[serv.ss_slicePos].si_inUse == 0 && @@ -317,6 +343,8 @@ ReadConfFile(void) fclose(f); +#if 0 + /* Faiyaz asked me to allow a single-entry codemux conf */ if (num == 1) { if (numServices == 0) { fprintf(stderr, "nothing found in codemux.conf\n"); @@ -324,12 +352,18 @@ ReadConfFile(void) } return; } +#endif + if (num < 1) { + fprintf(stderr, "no entry found in codemux.conf\n"); + exit(-1); + } for (i = 0; i < numServices; i++) { - free(serviceSig[i].ss_host); - free(serviceSig[i].ss_slice); + xfree(serviceSig[i].ss_host); + xfree(serviceSig[i].ss_ip); + xfree(serviceSig[i].ss_slice); } - free(serviceSig); + xfree(serviceSig); serviceSig = servs; numServices = num; confFileReadTime = statBuf.st_mtime; @@ -426,7 +460,7 @@ static int FindService(FlowBuf *fb, int *whichService, struct in_addr addr) { char *end; - char *lowerBuf; + char lowerBuf[FB_ALLOCSIZE]; char *hostVal; char *buf = fb->fb_buf; char orig[256]; @@ -435,7 +469,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr) int i; int len; #endif - + if (strstr(buf, "\n\r\n") == NULL && strstr(buf, "\n\n") == NULL) return(FAILURE); @@ -444,7 +478,7 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr) fb->fb_used += InsertHeader(buf, fb->fb_used + 1, orig); /* get just the header, so we can work on it */ - LOCAL_STR_DUP_LOWER(lowerBuf, buf); + StrcpyLower(lowerBuf, buf); if ((end = strstr(lowerBuf, "\n\r\n")) == NULL) end = strstr(lowerBuf, "\n\n"); *end = '\0'; @@ -472,7 +506,6 @@ FindService(FlowBuf *fb, int *whichService, struct in_addr addr) DoesDotlessSuffixMatch(hostVal, 0, serviceSig[i].ss_host)) { *whichService = i; free(hostVal); - /* printf("%s", buf); */ return(SUCCESS); } } @@ -536,8 +569,12 @@ StartConnect(int origFD, int whichService) memset(&dest, 0, sizeof(dest)); dest.sin_family = AF_INET; dest.sin_port = htons(serviceSig[whichService].ss_port); - dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK); - + if (serviceSig[whichService].ss_ip != NULL) { + dest.sin_addr.s_addr = inet_addr(serviceSig[whichService].ss_ip); + } else { + dest.sin_addr.s_addr = htonl(INADDR_LOOPBACK); + } + /* start connection process - we should be told that it's in progress */ if (connect(sock, (struct sockaddr *) &dest, sizeof(dest)) != -1 || @@ -645,6 +682,10 @@ ReallyCloseSocks(void) SliceConnsDec(sockInfo[fd].si_whichService); sockInfo[fd].si_whichService = -1; } + /* KyoungSoo*/ + if (sockInfo[fd].si_peerFd >= 0) { + sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1; + } } numSocksToClose = 0; } @@ -664,7 +705,7 @@ SocketReadyToRead(int fd) } if ((fb = si->si_readBuf) == NULL) { - fb = si->si_readBuf = calloc(1, sizeof(FlowBuf)); + fb = si->si_readBuf = xcalloc(1, sizeof(FlowBuf)); fb->fb_refs = 1; if (si->si_peerFd >= 0) { sockInfo[si->si_peerFd].si_writeBuf = fb; @@ -673,10 +714,10 @@ SocketReadyToRead(int fd) } if (fb->fb_buf == NULL) - fb->fb_buf = malloc(FB_ALLOCSIZE); + fb->fb_buf = xmalloc(FB_ALLOCSIZE); /* determine read buffer size - if 0, then block reads and return */ - if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) { + if ((spaceLeft = FB_SIZE - fb->fb_used) <= 0) { if (si->si_needsHeaderSince) { write(fd, err400BadRequest, strlen(err400BadRequest)); CloseSock(fd); @@ -686,8 +727,8 @@ SocketReadyToRead(int fd) ClearFd(fd, &masterReadSet); return; } - } - + } + /* read as much as allowed, and is available */ if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) { CloseSock(fd); @@ -700,6 +741,7 @@ SocketReadyToRead(int fd) if (res == -1) { if (errno == EAGAIN) return; + TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno)); CloseSock(fd); if (fb->fb_used == 0 && si->si_peerFd >= 0) { CloseSock(si->si_peerFd); @@ -709,7 +751,7 @@ SocketReadyToRead(int fd) } fb->fb_used += res; fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */ - printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used); + // printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used); /* if we need header, check if we've gotten it. if so, do modifications and continue. if not, check if we've read the @@ -731,6 +773,25 @@ SocketReadyToRead(int fd) // printf("found service %d\n", whichService); slice = ServiceToSlice(whichService); + /* if it needs to be redirected to PLC, let it be handled here */ + if (whichService == 0 && domainNamePLCNetflow != NULL && + strcmp(slice->si_sliceName, "root") == 0) { + char msg[1024]; + int len; + static const char* resp302 = + "HTTP/1.0 302 Found\r\n" + "Location: http://%s\r\n" + "Cache-Control: no-cache, no-store\r\n" + "Content-type: text/html\r\n" + "Connection: close\r\n" + "\r\n" + "Your request is being redirected to PLC Netflow http://%s\n"; + len = snprintf(msg, sizeof(msg), resp302, + domainNamePLCNetflow, domainNamePLCNetflow); + write(fd, msg, len); + CloseSock(fd); + return; + } /* no service can have more than some absolute max number of connections. Also, when we're too busy, start enforcing fairness across the servers */ @@ -738,6 +799,7 @@ SocketReadyToRead(int fd) (numTotalSliceConns > FAIRNESS_CUTOFF && slice->si_numConns > MAX_CONNS/numActiveSlices)) { write(fd, err503TooBusy, strlen(err503TooBusy)); + TRACE("CloseSock(): fd=%d too busy\n", fd); CloseSock(fd); return; } @@ -758,6 +820,7 @@ SocketReadyToRead(int fd) numNeedingHeaders--; if (StartConnect(fd, whichService) != SUCCESS) { write(fd, err503Unavailable, strlen(err503Unavailable)); + TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd); CloseSock(fd); return; } @@ -767,9 +830,13 @@ SocketReadyToRead(int fd) /* write anything possible */ if (WriteAvailData(si->si_peerFd) != SUCCESS) { /* assume the worst and close */ + TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", + fd, errno, strerror(errno)); CloseSock(fd); - CloseSock(si->si_peerFd); - si->si_peerFd = -1; + if (si->si_peerFd >=0) { + CloseSock(si->si_peerFd); + si->si_peerFd = -1; + } } } /*-----------------------------------------------------------------*/ @@ -790,6 +857,8 @@ SocketReadyToWrite(int fd) /* if we have data, write it */ if (WriteAvailData(fd) != SUCCESS) { /* assume the worst and close */ + TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n", + fd, errno, strerror(errno)); CloseSock(fd); if (si->si_peerFd >= 0) { CloseSock(si->si_peerFd); @@ -799,8 +868,9 @@ SocketReadyToWrite(int fd) } /* if peer is closed and we're done writing, we should close */ - if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) + if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0) { CloseSock(fd); + } } /*-----------------------------------------------------------------*/ static void @@ -836,7 +906,7 @@ CloseReqlessConns(void) /* if it's too old, close it */ for (i = 0; i < highestSetFd+1; i++) { if (sockInfo[i].si_needsHeaderSince && - (now - sockInfo[i].si_needsHeaderSince) > maxAge) + (now - sockInfo[i].si_needsHeaderSince) > maxAge) CloseSock(i); } } @@ -910,6 +980,11 @@ MainLoop(int lisSock) socklen_t lenAddr = sizeof(addr); if ((newSock = accept(lisSock, (struct sockaddr *) &addr, &lenAddr)) >= 0) { + /* make socket non-blocking */ + if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) { + close(newSock); + continue; + } memset(&sockInfo[newSock], 0, sizeof(SockInfo)); sockInfo[newSock].si_needsHeaderSince = now; numNeedingHeaders++; @@ -964,19 +1039,8 @@ static int OpenLogFile(void) { static const char* logfile = "/var/log/codemux.log"; - static const char* oldlogfile = "/var/log/codemux.log.old"; int logfd; - /* if the previous log file exists, - rename it to the oldlogfile */ - if (access(logfile, F_OK) == 0) { - if (rename(logfile, oldlogfile) < 0) { - fprintf(stderr, "cannot rotate the logfile err=%s\n", - strerror(errno)); - exit(-1); - } - } - logfd = open(logfile, O_WRONLY | O_APPEND | O_CREAT, 0600); if (logfd < 0) { fprintf(stderr, "cannot open the logfile err=%s\n", @@ -1006,9 +1070,29 @@ main(int argc, char *argv[]) { int lisSock; int logFd; + int doDaemon = 1; + int opt; + struct in_addr lisAddress = { .s_addr = htonl(INADDR_ANY) }; + + while ((opt = getopt(argc, argv, "dl:")) != -1) { + switch (opt) { + case 'd': + doDaemon = 0; + break; + case 'l': + if (inet_pton(AF_INET, optarg, &lisAddress) <= 0) { + fprintf(stderr, "`%s' is not a valid address\n", optarg); + exit(-1); + } + break; + default: + fprintf(stderr, "Usage: %s [-d] [-l ]\n", argv[0]); + exit(-1); + } + } /* do the daemon stuff */ - if (argc <= 1 || strcmp(argv[1], "-d") != 0) { + if (doDaemon) { if (InitDaemon() < 0) { fprintf(stderr, "codemux daemon_init() failed\n"); exit(-1); @@ -1016,7 +1100,8 @@ main(int argc, char *argv[]) } /* create the accept socket */ - if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE)) < 0) { + if ((lisSock = CreatePrivateAcceptSocket(DEMUX_PORT, TRUE, + &lisAddress)) < 0) { fprintf(stderr, "failed creating accept socket\n"); exit(-1); } @@ -1025,6 +1110,10 @@ main(int argc, char *argv[]) /* open the log file */ logFd = OpenLogFile(); + + /* write down the version */ + fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION); + while (1) { numForks++; if (fork()) {