git://git.onelab.eu
/
codemux.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
- sets client-side socket non-blocking
[codemux.git]
/
codemux.c
diff --git
a/codemux.c
b/codemux.c
index
3ca9ba0
..
34ffc30
100644
(file)
--- a/
codemux.c
+++ b/
codemux.c
@@
-15,6
+15,14
@@
#include <string.h>
#include "codemuxlib.h"
#include <string.h>
#include "codemuxlib.h"
+#define DEBUG 0
+
+#ifdef DEBUG
+#define TRACE(fmt, msg...) fprintf(stderr, "[%s,%d] " fmt, __FUNCTION__, __LINE__, ##msg)
+#else
+#define TRACE(fmt, msg...) (void)0
+#endif
+
#define CONF_FILE "/etc/codemux/codemux.conf"
#define DEMUX_PORT 80
#define PIDFILE "/var/run/codemux.pid"
#define CONF_FILE "/etc/codemux/codemux.conf"
#define DEMUX_PORT 80
#define PIDFILE "/var/run/codemux.pid"
@@
-31,6
+39,8
@@
among them */
#define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
among them */
#define FAIRNESS_CUTOFF (MAX_CONNS * 0.85)
+/* codemux version */
+#define CODEMUX_VERSION "0.3"
typedef struct FlowBuf {
int fb_refs; /* num refs */
typedef struct FlowBuf {
int fb_refs; /* num refs */
@@
-107,8
+117,10
@@
DumpStatus(int fd)
int len;
sprintf(start,
int len;
sprintf(start,
+ "CoDemux version %s\n"
"numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
"numNeedingHeaders %d, anySliceXidsNeeded %d\n",
"numForks %d, numActiveSlices %d, numTotalSliceConns %d\n"
"numNeedingHeaders %d, anySliceXidsNeeded %d\n",
+ CODEMUX_VERSION,
numForks, numActiveSlices, numTotalSliceConns,
numNeedingHeaders, anySliceXidsNeeded);
start += strlen(start);
numForks, numActiveSlices, numTotalSliceConns,
numNeedingHeaders, anySliceXidsNeeded);
start += strlen(start);
@@
-645,6
+657,10
@@
ReallyCloseSocks(void)
SliceConnsDec(sockInfo[fd].si_whichService);
sockInfo[fd].si_whichService = -1;
}
SliceConnsDec(sockInfo[fd].si_whichService);
sockInfo[fd].si_whichService = -1;
}
+ /* KyoungSoo*/
+ if (sockInfo[fd].si_peerFd >= 0) {
+ sockInfo[sockInfo[fd].si_peerFd].si_peerFd = -1;
+ }
}
numSocksToClose = 0;
}
}
numSocksToClose = 0;
}
@@
-676,7
+692,7
@@
SocketReadyToRead(int fd)
fb->fb_buf = malloc(FB_ALLOCSIZE);
/* determine read buffer size - if 0, then block reads and return */
fb->fb_buf = malloc(FB_ALLOCSIZE);
/* determine read buffer size - if 0, then block reads and return */
- if ((spaceLeft = FB_SIZE - fb->fb_used) < 0) {
+ if ((spaceLeft = FB_SIZE - fb->fb_used) <
=
0) {
if (si->si_needsHeaderSince) {
write(fd, err400BadRequest, strlen(err400BadRequest));
CloseSock(fd);
if (si->si_needsHeaderSince) {
write(fd, err400BadRequest, strlen(err400BadRequest));
CloseSock(fd);
@@
-686,8
+702,8
@@
SocketReadyToRead(int fd)
ClearFd(fd, &masterReadSet);
return;
}
ClearFd(fd, &masterReadSet);
return;
}
- }
-
+ }
+
/* read as much as allowed, and is available */
if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
CloseSock(fd);
/* read as much as allowed, and is available */
if ((res = read(fd, &fb->fb_buf[fb->fb_used], spaceLeft)) == 0) {
CloseSock(fd);
@@
-700,6
+716,7
@@
SocketReadyToRead(int fd)
if (res == -1) {
if (errno == EAGAIN)
return;
if (res == -1) {
if (errno == EAGAIN)
return;
+ TRACE("fd=%d errno=%d errstr=%s\n",fd, errno, strerror(errno));
CloseSock(fd);
if (fb->fb_used == 0 && si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
CloseSock(fd);
if (fb->fb_used == 0 && si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
@@
-709,7
+726,7
@@
SocketReadyToRead(int fd)
}
fb->fb_used += res;
fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
}
fb->fb_used += res;
fb->fb_buf[fb->fb_used] = 0; /* terminate it for convenience */
- printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
+
//
printf("sock %d, read %d, total %d\n", fd, res, fb->fb_used);
/* if we need header, check if we've gotten it. if so, do
modifications and continue. if not, check if we've read the
/* if we need header, check if we've gotten it. if so, do
modifications and continue. if not, check if we've read the
@@
-738,6
+755,7
@@
SocketReadyToRead(int fd)
(numTotalSliceConns > FAIRNESS_CUTOFF &&
slice->si_numConns > MAX_CONNS/numActiveSlices)) {
write(fd, err503TooBusy, strlen(err503TooBusy));
(numTotalSliceConns > FAIRNESS_CUTOFF &&
slice->si_numConns > MAX_CONNS/numActiveSlices)) {
write(fd, err503TooBusy, strlen(err503TooBusy));
+ TRACE("CloseSock(): fd=%d too busy\n", fd);
CloseSock(fd);
return;
}
CloseSock(fd);
return;
}
@@
-758,6
+776,7
@@
SocketReadyToRead(int fd)
numNeedingHeaders--;
if (StartConnect(fd, whichService) != SUCCESS) {
write(fd, err503Unavailable, strlen(err503Unavailable));
numNeedingHeaders--;
if (StartConnect(fd, whichService) != SUCCESS) {
write(fd, err503Unavailable, strlen(err503Unavailable));
+ TRACE("CloseSock(): fd=%d StartConnect() failed\n", fd);
CloseSock(fd);
return;
}
CloseSock(fd);
return;
}
@@
-767,9
+786,13
@@
SocketReadyToRead(int fd)
/* write anything possible */
if (WriteAvailData(si->si_peerFd) != SUCCESS) {
/* assume the worst and close */
/* write anything possible */
if (WriteAvailData(si->si_peerFd) != SUCCESS) {
/* assume the worst and close */
+ TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n",
+ fd, errno, strerror(errno));
CloseSock(fd);
CloseSock(fd);
- CloseSock(si->si_peerFd);
- si->si_peerFd = -1;
+ if (si->si_peerFd >=0) {
+ CloseSock(si->si_peerFd);
+ si->si_peerFd = -1;
+ }
}
}
/*-----------------------------------------------------------------*/
}
}
/*-----------------------------------------------------------------*/
@@
-790,6
+813,8
@@
SocketReadyToWrite(int fd)
/* if we have data, write it */
if (WriteAvailData(fd) != SUCCESS) {
/* assume the worst and close */
/* if we have data, write it */
if (WriteAvailData(fd) != SUCCESS) {
/* assume the worst and close */
+ TRACE("CloseSock(): fd=%d WriteAvailData() failed errno=%d errstr=%s\n",
+ fd, errno, strerror(errno));
CloseSock(fd);
if (si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
CloseSock(fd);
if (si->si_peerFd >= 0) {
CloseSock(si->si_peerFd);
@@
-799,8
+824,9
@@
SocketReadyToWrite(int fd)
}
/* if peer is closed and we're done writing, we should close */
}
/* if peer is closed and we're done writing, we should close */
- if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
+ if (si->si_peerFd < 0 && si->si_writeBuf->fb_used == 0)
{
CloseSock(fd);
CloseSock(fd);
+ }
}
/*-----------------------------------------------------------------*/
static void
}
/*-----------------------------------------------------------------*/
static void
@@
-836,7
+862,7
@@
CloseReqlessConns(void)
/* if it's too old, close it */
for (i = 0; i < highestSetFd+1; i++) {
if (sockInfo[i].si_needsHeaderSince &&
/* if it's too old, close it */
for (i = 0; i < highestSetFd+1; i++) {
if (sockInfo[i].si_needsHeaderSince &&
- (now - sockInfo[i].si_needsHeaderSince) > maxAge)
+ (now - sockInfo[i].si_needsHeaderSince) > maxAge)
CloseSock(i);
}
}
CloseSock(i);
}
}
@@
-910,6
+936,11
@@
MainLoop(int lisSock)
socklen_t lenAddr = sizeof(addr);
if ((newSock = accept(lisSock, (struct sockaddr *) &addr,
&lenAddr)) >= 0) {
socklen_t lenAddr = sizeof(addr);
if ((newSock = accept(lisSock, (struct sockaddr *) &addr,
&lenAddr)) >= 0) {
+ /* make socket non-blocking */
+ if (fcntl(newSock, F_SETFL, O_NONBLOCK) < 0) {
+ close(newSock);
+ continue;
+ }
memset(&sockInfo[newSock], 0, sizeof(SockInfo));
sockInfo[newSock].si_needsHeaderSince = now;
numNeedingHeaders++;
memset(&sockInfo[newSock], 0, sizeof(SockInfo));
sockInfo[newSock].si_needsHeaderSince = now;
numNeedingHeaders++;
@@
-1025,6
+1056,10
@@
main(int argc, char *argv[])
/* open the log file */
logFd = OpenLogFile();
/* open the log file */
logFd = OpenLogFile();
+
+ /* write down the version */
+ fprintf(stderr, "CoDemux version %s started\n", CODEMUX_VERSION);
+
while (1) {
numForks++;
if (fork()) {
while (1) {
numForks++;
if (fork()) {