X-Git-Url: http://git.onelab.eu/?p=fprobe-ulog.git;a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=bca3b5d3253759d39e55b033d86cbb4893e04fb3;hp=8c66067b2ba862ca5bcd5ef05fc51c1b3d075e12;hb=466dcb566b08ec31ac3eb9946ec986b9e9f8c9c1;hpb=164b27f0c908f203af585fcd999033b62ba54a22 diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 8c66067..bca3b5d 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -8,7 +8,7 @@ Sapan Bhatia - 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file + 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file rotation. 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility. @@ -37,6 +37,8 @@ #include #include +#include "vserver.h" + struct ipulog_handle { int fd; u_int8_t blocking; @@ -95,7 +97,9 @@ struct ipulog_handle { #include #include -#define PIDFILE "/var/log/fprobe-ulog.pid" +#define PIDFILE "/var/log/fprobe-ulog.pid" +#define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch" +#define MAX_EPOCH_SIZE sizeof("32767") #define STD_NETFLOW_PDU enum { @@ -162,10 +166,10 @@ extern struct NetFlow NetFlow1; extern struct NetFlow NetFlow5; extern struct NetFlow NetFlow7; -#define START_VALUE -5 +#define START_DATA_FD -5 #define mark_is_tos parms[Mflag].count static unsigned scan_interval = 5; -static int min_free = 0; +static unsigned int min_free = 0; static int frag_lifetime = 30; static int inactive_lifetime = 60; static int active_lifetime = 300; @@ -178,7 +182,7 @@ static int sockbufsize; #endif static unsigned epoch_length=60, log_epochs=1; -static unsigned cur_epoch=0,prev_uptime=0; +static unsigned cur_epoch=0,prev_uptime=0,last_peak=0; static unsigned bulk_quantity = BULK_QUANTITY; static unsigned pending_queue_length = 100; @@ -213,6 +217,7 @@ static int sched_min, sched_max; static int npeers, npeers_rot; static struct peer *peers; static int sigs; +static char cur_output_file[MAX_PATH_LEN]; static struct Flow *flows[1 << HASH_BITS]; static pthread_mutex_t flows_mutex[1 << HASH_BITS]; @@ -245,37 +250,37 @@ static struct passwd *pw = 0; void usage() { fprintf(stdout, - "fprobe-ulog: a NetFlow probe. Version %s\n" - "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n" - "\n" - "-h\t\tDisplay this help\n" - "-U \tULOG group bitwise mask [1]\n" - "-s \tHow often scan for expired flows [5]\n" - "-g \tFragmented flow lifetime [30]\n" - "-e \tActive flow lifetime (active timer) [300]\n" - "-f \tLog flow data in a file\n" - "-G \tRotate logs on an 0-hourly, 1-daily basis\n" - "-n \tNetFlow version for use (1, 5 or 7) [5]\n" - "-a
\tUse
as source for NetFlow flow\n" - "-X \tInterface name to SNMP-index conversion rules\n" - "-M\t\tUse netfilter mark value as ToS flag\n" - "-b \tMemory bulk size (1..%u) [%u]\n" - "-m \tMemory limit (0=no limit) [0]\n" - "-q \tPending queue length [100]\n" - "-B \tKernel capture buffer size [0]\n" - "-r \tReal-time priority (0=disabled, %d..%d) [0]\n" - "-t \tProduce nanosecond delay after each bytes sent [0:0]\n" - "-c \tDirectory to chroot to\n" - "-u \tUser to run as\n" - "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" - "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" - "-y \tAddress of the NetFlow collector\n" - "-f \tFile to write data into\n" - "-T \tRotate log file every n epochs\n" - "-W \tSet current epoch to n. Useful when restarting fprobe\n" - "-E <[1..60]>\tSize of an epoch in minutes\n" - "-D \tNumber of disk blocks to preserve as free space\n" - , + "fprobe-ulog: a NetFlow probe. Version %s\n" + "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n" + "\n" + "-h\t\tDisplay this help\n" + "-U \tULOG group bitwise mask [1]\n" + "-s \tHow often scan for expired flows [5]\n" + "-g \tFragmented flow lifetime [30]\n" + "-e \tActive flow lifetime (active timer) [300]\n" + "-f \tLog flow data in a file\n" + "-G \tRotate logs on an 0-hourly, 1-daily basis\n" + "-n \tNetFlow version for use (1, 5 or 7) [5]\n" + "-a
\tUse
as source for NetFlow flow\n" + "-X \tInterface name to SNMP-index conversion rules\n" + "-M\t\tUse netfilter mark value as ToS flag\n" + "-b \tMemory bulk size (1..%u) [%u]\n" + "-m \tMemory limit (0=no limit) [0]\n" + "-q \tPending queue length [100]\n" + "-B \tKernel capture buffer size [0]\n" + "-r \tReal-time priority (0=disabled, %d..%d) [0]\n" + "-t \tProduce nanosecond delay after each bytes sent [0:0]\n" + "-c \tDirectory to chroot to\n" + "-u \tUser to run as\n" + "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" + "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" + "-y \tAddress of the NetFlow collector\n" + "-f \tFile to write data into\n" + "-T \tRotate log file every n epochs\n" + "-W \tSet current epoch to n. Useful when restarting fprobe\n" + "-E <[1..60]>\tSize of an epoch in minutes\n" + "-D \tNumber of disk blocks to preserve as free space\n" + , VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); exit(0); } @@ -284,14 +289,14 @@ void usage() void info_debug() { my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d", - pkts_total, pkts_total_fragmented, size_total, - pkts_pending - pkts_pending_done, pending_queue_trace); + pkts_total, pkts_total_fragmented, size_total, + pkts_pending - pkts_pending_done, pending_queue_trace); my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d", - pkts_ignored, pkts_lost_capture, pkts_lost_unpending); + pkts_ignored, pkts_lost_capture, pkts_lost_unpending); my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d", - flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue); + flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue); my_log(LOG_DEBUG, "I: memory:%d/%d (%d)", - total_elements, free_elements, total_memory); + total_elements, free_elements, total_memory); } #endif @@ -371,6 +376,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->sip = src->sip; dst->dip = src->dip; dst->tos = src->tos; + dst->slice_id = src->slice_id; dst->proto = src->proto; dst->tcp_flags = src->tcp_flags; dst->id = src->id; @@ -385,13 +391,15 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->flags = src->flags; } -void get_cur_epoch() { +void read_cur_epoch() { int fd; - fd = open("/tmp/fprobe_last_epoch",O_RDONLY); + /* Reset to -1 in case the read fails */ + cur_epoch=-1; + fd = open(LAST_EPOCH_FILE, O_RDONLY); if (fd != -1) { - char snum[7]; + char snum[MAX_EPOCH_SIZE]; ssize_t len; - len = read(fd, snum, sizeof(snum)-1); + len = read(fd, snum, MAX_EPOCH_SIZE-1); if (len != -1) { snum[len]='\0'; sscanf(snum,"%d",&cur_epoch); @@ -403,61 +411,81 @@ void get_cur_epoch() { } +/* Dumps the current epoch in a file to cope with + * reboots and killings of fprobe */ + void update_cur_epoch_file(int n) { int fd, len; - char snum[7]; - len=snprintf(snum,6,"%d",n); - fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC); + char snum[MAX_EPOCH_SIZE]; + len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n); + fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC); if (fd == -1) { - my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0."); + my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE); return; } write(fd, snum, len); close(fd); } -unsigned get_log_fd(char *fname, int cur_fd) { +/* Get the file descriptor corresponding to the current file. + * The kludgy implementation is to abstract away the 'current + * file descriptor', which may also be a socket. + */ + +unsigned get_data_file_fd(char *fname, int cur_fd) { struct Time now; unsigned cur_uptime; - /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that - * doesn't solve the problem */ struct statfs statfs; int ret_fd; + + /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that + * doesn't solve the problem */ gettime(&now); cur_uptime = getuptime_minutes(&now); - - if (cur_fd!=START_VALUE) { + if (cur_fd != START_DATA_FD) { if (fstatfs(cur_fd, &statfs) == -1) { my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks"); } else { - if (min_free && (statfs.f_bavail < min_free)) - switch(cur_epoch) { - case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */ - my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out."); - exit(1); - default: - my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch); - cur_epoch = -1; - } + if (min_free && (statfs.f_bavail < min_free) + && (cur_epoch==last_peak)) + { + my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch); + cur_epoch = -1; + } + /* + else + assume that we can reclaim space by overwriting our own files + and that the difference in size will not fill the disk - sapan + */ } } - /* Epoch length in minutes */ + /* If epoch length has been exceeded, + * or we're starting up + * or we're going back to the first epoch */ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { - char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; - if (cur_fd>0) + if (cur_epoch>last_peak) last_peak = cur_epoch; + if (cur_fd>0) { close(cur_fd); - snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) { - my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); + /* Compress the finished file */ + char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; + snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file); + system(gzip_cmd); + } + snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); + if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno)); exit(1); } + if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) { + my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno)); + } update_cur_epoch_file(cur_epoch); ret_fd = write_fd; } @@ -479,13 +507,13 @@ struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev) while (where) { if (where->sip.s_addr == what->sip.s_addr - && where->dip.s_addr == what->dip.s_addr - && where->proto == what->proto) { + && where->dip.s_addr == what->dip.s_addr + && where->proto == what->proto) { switch ((what->flags + where->flags) & FLOW_FRAGMASK) { case 0: /* Both unfragmented */ if ((what->sp == where->sp) - && (what->dp == where->dp)) goto done; + && (what->dp == where->dp)) goto done; break; case 2: /* Both fragmented */ @@ -501,11 +529,11 @@ done: return where; } -int put_into(struct Flow *flow, int flag + int put_into(struct Flow *flow, int flag #if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , char *logbuf + , char *logbuf #endif -) + ) { int ret = 0; hash_t h; @@ -530,7 +558,7 @@ int put_into(struct Flow *flow, int flag } else { #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES my_log(LOG_ERR, "%s %s. %s", - "mem_alloc():", strerror(errno), "packet lost"); + "mem_alloc():", strerror(errno), "packet lost"); #endif return -1; } @@ -560,27 +588,36 @@ int put_into(struct Flow *flow, int flag flown->tcp_flags |= flow->tcp_flags; flown->size += flow->size; flown->pkts += flow->pkts; + + /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow + * if a better value comes along. A good example of this is that by the time CoDemux sets the + * peercred of a flow, it has already been accounted for here and attributed to root. */ + + if (flown->slice_id<1) + flown->slice_id = flow->slice_id; + + if (flow->flags & FLOW_FRAG) { /* Fragmented flow require some additional work */ if (flow->flags & FLOW_TL) { /* - ?FIXME? - Several packets with FLOW_TL (attack) - */ + ?FIXME? + Several packets with FLOW_TL (attack) + */ flown->sp = flow->sp; flown->dp = flow->dp; } if (flow->flags & FLOW_LASTFRAG) { /* - ?FIXME? - Several packets with FLOW_LASTFRAG (attack) - */ + ?FIXME? + Several packets with FLOW_LASTFRAG (attack) + */ flown->sizeP = flow->sizeP; } flown->flags |= flow->flags; flown->sizeF += flow->sizeF; if ((flown->flags & FLOW_LASTFRAG) - && (flown->sizeF >= flown->sizeP)) { + && (flown->sizeF >= flown->sizeP)) { /* All fragments received - flow reassembled */ *flowpp = flown->next; pthread_mutex_unlock(&flows_mutex[h]); @@ -597,7 +634,7 @@ int put_into(struct Flow *flow, int flag #if ((DEBUG) & (DEBUG_U | DEBUG_S)) , logbuf #endif - ); + ); } } if (flag == MOVE_INTO) mem_free(flow); @@ -606,6 +643,8 @@ int put_into(struct Flow *flow, int flag return ret; } +int onlyonce=0; + void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) { int i; @@ -622,7 +661,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) case NETFLOW_IPV4_DST_ADDR: ((struct in_addr *) p)->s_addr = flow->dip.s_addr; - if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) { + if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) { my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts); } p += NETFLOW_IPV4_DST_ADDR_SIZE; @@ -715,21 +754,25 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) break; case NETFLOW_PAD8: - /* Unsupported (uint8_t) */ + /* Unsupported (uint8_t) */ case NETFLOW_ENGINE_TYPE: case NETFLOW_ENGINE_ID: case NETFLOW_FLAGS7_1: case NETFLOW_SRC_MASK: case NETFLOW_DST_MASK: + if (onlyonce) { + my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n"); + onlyonce=1; + } *((uint8_t *) p) = 0; p += NETFLOW_PAD8_SIZE; break; - case NETFLOW_XID: - *((uint16_t *) p) = flow->tos; - p += NETFLOW_XID_SIZE; + case NETFLOW_SLICE_ID: + *((uint32_t *) p) = flow->slice_id; + p += NETFLOW_SLICE_ID_SIZE; break; case NETFLOW_PAD16: - /* Unsupported (uint16_t) */ + /* Unsupported (uint16_t) */ case NETFLOW_SRC_AS: case NETFLOW_DST_AS: case NETFLOW_FLAGS7_2: @@ -738,7 +781,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) break; case NETFLOW_PAD32: - /* Unsupported (uint32_t) */ + /* Unsupported (uint32_t) */ case NETFLOW_IPV4_NEXT_HOP: case NETFLOW_ROUTER_SC: *((uint32_t *) p) = 0; @@ -747,7 +790,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) default: my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d", - format, i, format[i]); + format, i, format[i]); exit(1); } } @@ -759,9 +802,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) void setuser() { /* - Workaround for clone()-based threads - Try to change EUID independently of main thread - */ + Workaround for clone()-based threads + Try to change EUID independently of main thread + */ if (pw) { setgroups(0, NULL); setregid(pw->pw_gid, pw->pw_gid); @@ -781,6 +824,8 @@ void *emit_thread() timeout.tv_nsec = 0; setuser(); + + //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT); for (;;) { pthread_mutex_lock(&emit_mutex); @@ -789,6 +834,7 @@ void *emit_thread() timeout.tv_sec = now.tv_sec + emit_timeout; /* Do not wait until emit_packet will filled - it may be too long */ if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) { + my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec); pthread_mutex_unlock(&emit_mutex); goto sendit; } @@ -814,7 +860,7 @@ void *emit_thread() fflush(stdout); #endif if (emit_count == netflow->MaxFlows) { - sendit: +sendit: gettime(&emit_time); p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet); size = netflow->HeaderSize + emit_count * netflow->FlowSize; @@ -825,73 +871,73 @@ void *emit_thread() peer_rot_cur = 0; for (i = 0; i < npeers; i++) { if (peers[i].type == PEER_FILE) { - if (netflow->SeqOffset) - *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq); - peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd); - ret = write(peers[i].write_fd, emit_packet, size); - if (ret < size) { + if (netflow->SeqOffset) + *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq); + peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd); + ret = write(peers[i].write_fd, emit_packet, size); + if (ret < size) { #if ((DEBUG) & DEBUG_E) || defined MESSAGES - my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s", + my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s", i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); #endif #undef MESSAGES - } + } #if ((DEBUG) & DEBUG_E) - commaneelse { - my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", + else { + my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", emit_count, i + 1, peers[i].seq); - } + } #endif - peers[i].seq += emit_count; - - /* Rate limit */ - if (emit_rate_bytes) { - sent += size; - delay = sent / emit_rate_bytes; - if (delay) { - sent %= emit_rate_bytes; - timeout.tv_sec = 0; - timeout.tv_nsec = emit_rate_delay * delay; - while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); - } + peers[i].seq += emit_count; + + /* Rate limit */ + if (emit_rate_bytes) { + sent += size; + delay = sent / emit_rate_bytes; + if (delay) { + sent %= emit_rate_bytes; + timeout.tv_sec = 0; + timeout.tv_nsec = emit_rate_delay * delay; + while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); } } + } else - if (peers[i].type == PEER_MIRROR) goto sendreal; - else - if (peers[i].type == PEER_ROTATE) - if (peer_rot_cur++ == peer_rot_work) { - sendreal: - if (netflow->SeqOffset) - *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); - ret = send(peers[i].write_fd, emit_packet, size, 0); - if (ret < size) { + if (peers[i].type == PEER_MIRROR) goto sendreal; + else + if (peers[i].type == PEER_ROTATE) + if (peer_rot_cur++ == peer_rot_work) { +sendreal: + if (netflow->SeqOffset) + *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq); + ret = send(peers[i].write_fd, emit_packet, size, 0); + if (ret < size) { #if ((DEBUG) & DEBUG_E) || defined MESSAGES - my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s", - i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); + my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s", + i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); #endif - } + } #if ((DEBUG) & DEBUG_E) - commaneelse { - my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", - emit_count, i + 1, peers[i].seq); - } + else { + my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d", + emit_count, i + 1, peers[i].seq); + } #endif - peers[i].seq += emit_count; - - /* Rate limit */ - if (emit_rate_bytes) { - sent += size; - delay = sent / emit_rate_bytes; - if (delay) { - sent %= emit_rate_bytes; - timeout.tv_sec = 0; - timeout.tv_nsec = emit_rate_delay * delay; - while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); + peers[i].seq += emit_count; + + /* Rate limit */ + if (emit_rate_bytes) { + sent += size; + delay = sent / emit_rate_bytes; + if (delay) { + sent %= emit_rate_bytes; + timeout.tv_sec = 0; + timeout.tv_nsec = emit_rate_delay * delay; + while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR); + } + } } - } - } } if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot; emit_sequence += emit_count; @@ -928,9 +974,9 @@ void *unpending_thread() #endif if (put_into(pending_tail, COPY_INTO #if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , logbuf + , logbuf #endif - ) < 0) { + ) < 0) { #if ((DEBUG) & DEBUG_I) pkts_lost_unpending++; #endif @@ -996,7 +1042,7 @@ void *scan_thread() } else { /* Flow is not frgamented */ if ((now.sec - flow->mtime.sec) > inactive_lifetime - || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) { + || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) { /* Flow expired */ #if ((DEBUG) & DEBUG_S) my_log(LOG_DEBUG, "S: E %x", flow); @@ -1031,9 +1077,9 @@ void *scan_thread() #endif put_into(flow, MOVE_INTO #if ((DEBUG) & (DEBUG_S | DEBUG_U)) - , logbuf + , logbuf #endif - ); + ); #if ((DEBUG) & DEBUG_S) my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf); #endif @@ -1052,6 +1098,7 @@ void *cap_thread() char buf[64]; char logbuf[256]; #endif + int challenge; setuser(); @@ -1086,11 +1133,11 @@ void *cap_thread() #if ((DEBUG) & DEBUG_C) || defined MESSAGES my_log(LOG_ERR, # if ((DEBUG) & DEBUG_C) - "%s %s %s", logbuf, + "%s %s %s", logbuf, # else - "%s %s", + "%s %s", # endif - "pending queue full:", "packet lost"); + "pending queue full:", "packet lost"); #endif #if ((DEBUG) & DEBUG_I) pkts_lost_capture++; @@ -1112,12 +1159,26 @@ void *cap_thread() flow->sip = nl->ip_src; flow->dip = nl->ip_dst; - if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) { - my_log(LOG_INFO, "Received test flow to corewars.org"); + flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; + + /* It's going to be expensive calling this syscall on every flow. + * We should keep a local hash table, for now just bear the overhead... - Sapan*/ + + flow->slice_id=0; + + if (ulog_msg->mark > 0) { + flow->slice_id = xid_to_slice_id(ulog_msg->mark); + } + + if (flow->slice_id < 1) + flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid + + + if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) { + my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id); } flow->iif = snmp_index(ulog_msg->indev_name); flow->oif = snmp_index(ulog_msg->outdev_name); - flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; flow->proto = nl->ip_p; flow->id = 0; flow->tcp_flags = 0; @@ -1134,12 +1195,12 @@ void *cap_thread() off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3; /* - Offset (from network layer) to transport layer header/IP data - IOW IP header size ;-) + Offset (from network layer) to transport layer header/IP data + IOW IP header size ;-) - ?FIXME? - Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... - */ + ?FIXME? + Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks... + */ off_tl = nl->ip_hl << 2; tl = (void *) nl + off_tl; @@ -1176,12 +1237,12 @@ void *cap_thread() #endif /* - Fortunately most interesting transport layer information fit - into first 8 bytes of IP data field (minimal nonzero size). - Thus we don't need actual packet reassembling to build whole - transport layer data. We only check the fragment offset for - zero value to find packet with this information. - */ + Fortunately most interesting transport layer information fit + into first 8 bytes of IP data field (minimal nonzero size). + Thus we don't need actual packet reassembling to build whole + transport layer data. We only check the fragment offset for + zero value to find packet with this information. + */ if (!off_frag && psize >= 8) { switch (flow->proto) { case IPPROTO_TCP: @@ -1211,7 +1272,7 @@ void *cap_thread() flow->dp = 0; break; - tl_known: +tl_known: #if ((DEBUG) & DEBUG_C) sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp)); strcat(logbuf, buf); @@ -1222,8 +1283,8 @@ void *cap_thread() /* Check for tcp flags presence (including CWR and ECE). */ if (flow->proto == IPPROTO_TCP - && off_frag < 16 - && psize >= 16 - off_frag) { + && off_frag < 16 + && psize >= 16 - off_frag) { flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag)); #if ((DEBUG) & DEBUG_C) sprintf(buf, " TCP:%x", flow->tcp_flags); @@ -1247,7 +1308,7 @@ void *cap_thread() /* Flow complete - inform unpending_thread() about it */ pending_head->flags |= FLOW_PENDING; pending_head = pending_head->next; - done: +done: pthread_cond_signal(&unpending_cond); } } @@ -1257,39 +1318,39 @@ void *cap_thread() /* Copied out of CoDemux */ static int init_daemon() { - pid_t pid; - FILE *pidfile; - - pidfile = fopen(PIDFILE, "w"); - if (pidfile == NULL) { - my_log(LOG_ERR, "%s creation failed\n", PIDFILE); - } - - if ((pid = fork()) < 0) { - fclose(pidfile); - my_log(LOG_ERR, "Could not fork!\n"); - return(-1); - } - else if (pid != 0) { - /* i'm the parent, writing down the child pid */ - fprintf(pidfile, "%u\n", pid); - fclose(pidfile); - exit(0); - } - - /* close the pid file */ - fclose(pidfile); - - /* routines for any daemon process - 1. create a new session - 2. change directory to the root - 3. change the file creation permission - */ - setsid(); - chdir("/usr/local/fprobe"); - umask(0); - - return(0); + pid_t pid; + FILE *pidfile; + + pidfile = fopen(PIDFILE, "w"); + if (pidfile == NULL) { + my_log(LOG_ERR, "%s creation failed\n", PIDFILE); + } + + if ((pid = fork()) < 0) { + fclose(pidfile); + my_log(LOG_ERR, "Could not fork!\n"); + return(-1); + } + else if (pid != 0) { + /* i'm the parent, writing down the child pid */ + fprintf(pidfile, "%u\n", pid); + fclose(pidfile); + exit(0); + } + + /* close the pid file */ + fclose(pidfile); + + /* routines for any daemon process + 1. create a new session + 2. change directory to the root + 3. change the file creation permission + */ + setsid(); + chdir("/var/local/fprobe"); + umask(0); + + return(0); } int main(int argc, char **argv) @@ -1366,7 +1427,7 @@ int main(int argc, char **argv) if (log_suffix) *--log_suffix = ':'; } if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) { - err_malloc: +err_malloc: fprintf(stderr, "malloc(): %s\n", strerror(errno)); exit(1); } @@ -1381,8 +1442,8 @@ int main(int argc, char **argv) if (parms[rflag].count) { schedp.sched_priority = atoi(parms[rflag].arg); if (schedp.sched_priority - && (schedp.sched_priority < sched_min - || schedp.sched_priority > sched_max)) { + && (schedp.sched_priority < sched_min + || schedp.sched_priority > sched_max)) { fprintf(stderr, "Illegal %s\n", "realtime priority"); exit(1); } @@ -1427,7 +1488,7 @@ int main(int argc, char **argv) sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay); if (parms[aflag].count) { if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) { - bad_lhost: +bad_lhost: fprintf(stderr, "Illegal %s\n", "source address"); exit(1); } else { @@ -1513,12 +1574,12 @@ bad_collector: if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc; strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN); - - peers[npeers].write_fd = START_VALUE; + + peers[npeers].write_fd = START_DATA_FD; peers[npeers].type = PEER_FILE; peers[npeers].seq = 0; - get_cur_epoch(); + read_cur_epoch(); npeers++; } else @@ -1529,12 +1590,12 @@ bad_collector: ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE); if (!ulog_handle) { fprintf(stderr, "libipulog initialization error: %s", - ipulog_strerror(ipulog_errno)); + ipulog_strerror(ipulog_errno)); exit(1); } if (sockbufsize) if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF, - &sockbufsize, sizeof(sockbufsize)) < 0) + &sockbufsize, sizeof(sockbufsize)) < 0) fprintf(stderr, "setsockopt(): %s", strerror(errno)); /* Daemonize (if log destination stdout-free) */ @@ -1548,15 +1609,15 @@ bad_collector: while (1) { int pid=fork(); if (pid==-1) { - fprintf(stderr, "fork(): %s", strerror(errno)); - exit(1); + fprintf(stderr, "fork(): %s", strerror(errno)); + exit(1); } else if (pid==0) { - setsid(); - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); - freopen("/dev/null", "w", stderr); - break; + setsid(); + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + break; } else { while (wait3(NULL,0,NULL) < 1); @@ -1573,6 +1634,7 @@ bad_collector: /* Initialization */ + init_slice_id_hash(); hash_init(); /* Actually for crc16 only */ mem_init(sizeof(struct Flow), bulk_quantity, memory_limit); for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0); @@ -1586,13 +1648,13 @@ bad_collector: gettime(&start_time); /* - Build static pending queue as circular buffer. - */ + Build static pending queue as circular buffer. + */ if (!(pending_head = mem_alloc())) goto err_mem_alloc; pending_tail = pending_head; for (i = pending_queue_length - 1; i--;) { if (!(pending_tail->next = mem_alloc())) { - err_mem_alloc: +err_mem_alloc: my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno)); exit(1); } @@ -1630,7 +1692,7 @@ bad_collector: for (i = 0; i < THREADS - 1; i++) { if (schedp.sched_priority > 0) { if ((pthread_attr_setschedpolicy(&tattr, SCHED)) || - (pthread_attr_setschedparam(&tattr, &schedp))) { + (pthread_attr_setschedparam(&tattr, &schedp))) { my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno)); exit(1); } @@ -1667,15 +1729,15 @@ bad_collector: my_log(LOG_INFO, "pid: %d", pid); my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s " - "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s", - ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, - netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity, - memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1, - emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "", - parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : ""); + "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s", + ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime, + netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity, + memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1, + emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "", + parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : ""); for (i = 0; i < nsnmp_rules; i++) { my_log(LOG_INFO, "SNMP rule #%d %s:%d", - i + 1, snmp_rules[i].basename, snmp_rules[i].base); + i + 1, snmp_rules[i].basename, snmp_rules[i].base); } for (i = 0; i < npeers; i++) { switch (peers[i].type) { @@ -1688,16 +1750,16 @@ bad_collector: } snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr)); my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1, - inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c); + inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c); } pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0); timeout.tv_usec = 0; while (!killed - || (total_elements - free_elements - pending_queue_length) - || emit_count - || pending_tail->flags) { + || (total_elements - free_elements - pending_queue_length) + || emit_count + || pending_tail->flags) { if (!sigs) { timeout.tv_sec = scan_interval;