Make fprobe compress the collected flow logs.
[fprobe-ulog.git] / src / fprobe-ulog.c
index 8c66067..bca3b5d 100644 (file)
@@ -8,7 +8,7 @@
 
                        Sapan Bhatia <sapanb@cs.princeton.edu> 
                        
-       7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
+       7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
                        rotation.
 
        15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
@@ -37,6 +37,8 @@
 #include <sys/vfs.h>
 
 #include <libipulog/libipulog.h>
+#include "vserver.h"
+
 struct ipulog_handle {
        int fd;
        u_int8_t blocking;
@@ -95,7 +97,9 @@ struct ipulog_handle {
 #include <hash.h>
 #include <mem.h>
 
-#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define PIDFILE                "/var/log/fprobe-ulog.pid"
+#define LAST_EPOCH_FILE        "/var/log/fprobe_last_epoch"
+#define MAX_EPOCH_SIZE         sizeof("32767")
 #define STD_NETFLOW_PDU
 
 enum {
@@ -162,10 +166,10 @@ extern struct NetFlow NetFlow1;
 extern struct NetFlow NetFlow5;
 extern struct NetFlow NetFlow7;
 
-#define START_VALUE -5
+#define START_DATA_FD -5
 #define mark_is_tos parms[Mflag].count
 static unsigned scan_interval = 5;
-static int min_free = 0;
+static unsigned int min_free = 0;
 static int frag_lifetime = 30;
 static int inactive_lifetime = 60;
 static int active_lifetime = 300;
@@ -178,7 +182,7 @@ static int sockbufsize;
 #endif
 
 static unsigned epoch_length=60, log_epochs=1; 
-static unsigned cur_epoch=0,prev_uptime=0;
+static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
 
 static unsigned bulk_quantity = BULK_QUANTITY;
 static unsigned pending_queue_length = 100;
@@ -213,6 +217,7 @@ static int sched_min, sched_max;
 static int npeers, npeers_rot;
 static struct peer *peers;
 static int sigs;
+static char cur_output_file[MAX_PATH_LEN];
 
 static struct Flow *flows[1 << HASH_BITS];
 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
@@ -245,37 +250,37 @@ static struct passwd *pw = 0;
 void usage()
 {
        fprintf(stdout,
-               "fprobe-ulog: a NetFlow probe. Version %s\n"
-               "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
-               "\n"
-               "-h\t\tDisplay this help\n"
-               "-U <mask>\tULOG group bitwise mask [1]\n"
-               "-s <seconds>\tHow often scan for expired flows [5]\n"
-               "-g <seconds>\tFragmented flow lifetime [30]\n"
-               "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
-               "-f <filename>\tLog flow data in a file\n"
-               "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
-               "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
-               "-a <address>\tUse <address> as source for NetFlow flow\n"
-               "-X <rules>\tInterface name to SNMP-index conversion rules\n"
-               "-M\t\tUse netfilter mark value as ToS flag\n"
-               "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
-               "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
-               "-q <flows>\tPending queue length [100]\n"
-               "-B <kilobytes>\tKernel capture buffer size [0]\n"
-               "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
-               "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
-               "-c <directory>\tDirectory to chroot to\n"
-               "-u <user>\tUser to run as\n"
-               "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
-               "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
-               "-y <remote:port>\tAddress of the NetFlow collector\n"
-               "-f <writable file>\tFile to write data into\n"
-               "-T <n>\tRotate log file every n epochs\n"
-               "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
-               "-E <[1..60]>\tSize of an epoch in minutes\n"
-               "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
-               ,
+                       "fprobe-ulog: a NetFlow probe. Version %s\n"
+                       "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
+                       "\n"
+                       "-h\t\tDisplay this help\n"
+                       "-U <mask>\tULOG group bitwise mask [1]\n"
+                       "-s <seconds>\tHow often scan for expired flows [5]\n"
+                       "-g <seconds>\tFragmented flow lifetime [30]\n"
+                       "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
+                       "-f <filename>\tLog flow data in a file\n"
+                       "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
+                       "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
+                       "-a <address>\tUse <address> as source for NetFlow flow\n"
+                       "-X <rules>\tInterface name to SNMP-index conversion rules\n"
+                       "-M\t\tUse netfilter mark value as ToS flag\n"
+                       "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
+                       "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
+                       "-q <flows>\tPending queue length [100]\n"
+                       "-B <kilobytes>\tKernel capture buffer size [0]\n"
+                       "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
+                       "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
+                       "-c <directory>\tDirectory to chroot to\n"
+                       "-u <user>\tUser to run as\n"
+                       "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
+                       "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
+                       "-y <remote:port>\tAddress of the NetFlow collector\n"
+                       "-f <writable file>\tFile to write data into\n"
+                       "-T <n>\tRotate log file every n epochs\n"
+                       "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
+                       "-E <[1..60]>\tSize of an epoch in minutes\n"
+                       "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
+                       ,
                VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
        exit(0);
 }
@@ -284,14 +289,14 @@ void usage()
 void info_debug()
 {
        my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
-               pkts_total, pkts_total_fragmented, size_total,
-               pkts_pending - pkts_pending_done, pending_queue_trace);
+                       pkts_total, pkts_total_fragmented, size_total,
+                       pkts_pending - pkts_pending_done, pending_queue_trace);
        my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
-               pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
+                       pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
        my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
-               flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
+                       flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
        my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
-               total_elements, free_elements, total_memory);
+                       total_elements, free_elements, total_memory);
 }
 #endif
 
@@ -371,6 +376,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->sip = src->sip;
        dst->dip = src->dip;
        dst->tos = src->tos;
+       dst->slice_id = src->slice_id;
        dst->proto = src->proto;
        dst->tcp_flags = src->tcp_flags;
        dst->id = src->id;
@@ -385,13 +391,15 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->flags = src->flags;
 }
 
-void get_cur_epoch() {
+void read_cur_epoch() {
        int fd;
-       fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+       /* Reset to -1 in case the read fails */
+       cur_epoch=-1;
+       fd = open(LAST_EPOCH_FILE, O_RDONLY);
        if (fd != -1) {
-               char snum[7];
+               char snum[MAX_EPOCH_SIZE];
                ssize_t len;
-               len = read(fd, snum, sizeof(snum)-1);
+               len = read(fd, snum, MAX_EPOCH_SIZE-1);
                if (len != -1) {
                        snum[len]='\0';
                        sscanf(snum,"%d",&cur_epoch);
@@ -403,61 +411,81 @@ void get_cur_epoch() {
 }
 
 
+/* Dumps the current epoch in a file to cope with
+ * reboots and killings of fprobe */
+
 void update_cur_epoch_file(int n) {
        int fd, len;
-       char snum[7];
-       len=snprintf(snum,6,"%d",n);
-       fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+       char snum[MAX_EPOCH_SIZE];
+       len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
+       fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
        if (fd == -1) {
-               my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+               my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
                return;
        }
        write(fd, snum, len);
        close(fd);
 }
 
-unsigned get_log_fd(char *fname, int cur_fd) {
+/* Get the file descriptor corresponding to the current file.
+ * The kludgy implementation is to abstract away the 'current
+ * file descriptor', which may also be a socket. 
+ */
+
+unsigned get_data_file_fd(char *fname, int cur_fd) {
        struct Time now;
        unsigned cur_uptime;
-       /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
-        * doesn't solve the problem */
 
        struct statfs statfs;
        int ret_fd;
+
+       /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+        * doesn't solve the problem */
        gettime(&now);
        cur_uptime = getuptime_minutes(&now);
 
-
-       if (cur_fd!=START_VALUE) {
+       if (cur_fd != START_DATA_FD) {
                if (fstatfs(cur_fd, &statfs) == -1) {
                        my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
                }
                else {
-                       if (min_free && (statfs.f_bavail < min_free)) 
-                               switch(cur_epoch) {
-                                       case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
-                                               my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
-                                               exit(1);
-                                       default:
-                                               my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
-                                               cur_epoch = -1;
-                               }
+                       if (min_free && (statfs.f_bavail < min_free)
+                                       && (cur_epoch==last_peak))
+                       {
+                               my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
+                               cur_epoch = -1;
+                       }
+                       /*
+                          else 
+                          assume that we can reclaim space by overwriting our own files
+                          and that the difference in size will not fill the disk - sapan
+                          */
                }
        }
 
-       /* Epoch length in minutes */
+       /* If epoch length has been exceeded, 
+        * or we're starting up 
+        * or we're going back to the first epoch */
        if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
-               char nextname[MAX_PATH_LEN];
                int write_fd;
                prev_uptime = cur_uptime;
                cur_epoch = (cur_epoch + 1) % log_epochs;
-               if (cur_fd>0)
+               if (cur_epoch>last_peak) last_peak = cur_epoch;
+               if (cur_fd>0) {
                        close(cur_fd);
-               snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
-               if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
-                       my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
+            /* Compress the finished file */
+            char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
+            snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
+            system(gzip_cmd);
+        }
+               snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+               if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
+                       my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
                        exit(1);
                }
+               if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
+                       my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
+               }
                update_cur_epoch_file(cur_epoch);
                ret_fd = write_fd;
        }
@@ -479,13 +507,13 @@ struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
 
        while (where) {
                if (where->sip.s_addr == what->sip.s_addr
-                       && where->dip.s_addr == what->dip.s_addr
-                       && where->proto == what->proto) {
+                               && where->dip.s_addr == what->dip.s_addr
+                               && where->proto == what->proto) {
                        switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
                                case 0:
                                        /* Both unfragmented */
                                        if ((what->sp == where->sp)
-                                               && (what->dp == where->dp)) goto done;
+                                                       && (what->dp == where->dp)) goto done;
                                        break;
                                case 2:
                                        /* Both fragmented */
@@ -501,11 +529,11 @@ done:
        return where;
 }
 
-int put_into(struct Flow *flow, int flag
+       int put_into(struct Flow *flow, int flag
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-       , char *logbuf
+                       , char *logbuf
 #endif
-)
+                   )
 {
        int ret = 0;
        hash_t h;
@@ -530,7 +558,7 @@ int put_into(struct Flow *flow, int flag
                        } else {
 #if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
                                my_log(LOG_ERR, "%s %s. %s",
-                                       "mem_alloc():", strerror(errno), "packet lost");
+                                               "mem_alloc():", strerror(errno), "packet lost");
 #endif
                                return -1;
                        }
@@ -560,27 +588,36 @@ int put_into(struct Flow *flow, int flag
                flown->tcp_flags |= flow->tcp_flags;
                flown->size += flow->size;
                flown->pkts += flow->pkts;
+
+               /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
+                * if a better value comes along. A good example of this is that by the time CoDemux sets the
+                * peercred of a flow, it has already been accounted for here and attributed to root. */
+
+               if (flown->slice_id<1)
+                               flown->slice_id = flow->slice_id;
+
+
                if (flow->flags & FLOW_FRAG) {
                        /* Fragmented flow require some additional work */
                        if (flow->flags & FLOW_TL) {
                                /*
-                               ?FIXME?
-                               Several packets with FLOW_TL (attack)
-                               */
+                                  ?FIXME?
+                                  Several packets with FLOW_TL (attack)
+                                  */
                                flown->sp = flow->sp;
                                flown->dp = flow->dp;
                        }
                        if (flow->flags & FLOW_LASTFRAG) {
                                /*
-                               ?FIXME?
-                               Several packets with FLOW_LASTFRAG (attack)
-                               */
+                                  ?FIXME?
+                                  Several packets with FLOW_LASTFRAG (attack)
+                                  */
                                flown->sizeP = flow->sizeP;
                        }
                        flown->flags |= flow->flags;
                        flown->sizeF += flow->sizeF;
                        if ((flown->flags & FLOW_LASTFRAG)
-                               && (flown->sizeF >= flown->sizeP)) {
+                                       && (flown->sizeF >= flown->sizeP)) {
                                /* All fragments received - flow reassembled */
                                *flowpp = flown->next;
                                pthread_mutex_unlock(&flows_mutex[h]);
@@ -597,7 +634,7 @@ int put_into(struct Flow *flow, int flag
 #if ((DEBUG) & (DEBUG_U | DEBUG_S))
                                                , logbuf
 #endif
-                                       );
+                                             );
                        }
                }
                if (flag == MOVE_INTO) mem_free(flow);
@@ -606,6 +643,8 @@ int put_into(struct Flow *flow, int flag
        return ret;
 }
 
+int onlyonce=0;
+
 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 {
        int i;
@@ -622,7 +661,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        case NETFLOW_IPV4_DST_ADDR:
                                ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
-                               if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                               if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
                                        my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
                                }
                                p += NETFLOW_IPV4_DST_ADDR_SIZE;
@@ -715,21 +754,25 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                break;
 
                        case NETFLOW_PAD8:
-                       /* Unsupported (uint8_t) */
+                               /* Unsupported (uint8_t) */
                        case NETFLOW_ENGINE_TYPE:
                        case NETFLOW_ENGINE_ID:
                        case NETFLOW_FLAGS7_1:
                        case NETFLOW_SRC_MASK:
                        case NETFLOW_DST_MASK:
+                               if (onlyonce) {
+                                       my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
+                                       onlyonce=1;
+                               }
                                *((uint8_t *) p) = 0;
                                p += NETFLOW_PAD8_SIZE;
                                break;
-                       case NETFLOW_XID:
-                               *((uint16_t *) p) = flow->tos;
-                               p += NETFLOW_XID_SIZE;
+                       case NETFLOW_SLICE_ID:
+                               *((uint32_t *) p) = flow->slice_id;
+                               p += NETFLOW_SLICE_ID_SIZE;
                                break;
                        case NETFLOW_PAD16:
-                       /* Unsupported (uint16_t) */
+                               /* Unsupported (uint16_t) */
                        case NETFLOW_SRC_AS:
                        case NETFLOW_DST_AS:
                        case NETFLOW_FLAGS7_2:
@@ -738,7 +781,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                break;
 
                        case NETFLOW_PAD32:
-                       /* Unsupported (uint32_t) */
+                               /* Unsupported (uint32_t) */
                        case NETFLOW_IPV4_NEXT_HOP:
                        case NETFLOW_ROUTER_SC:
                                *((uint32_t *) p) = 0;
@@ -747,7 +790,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        default:
                                my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
-                                       format, i, format[i]);
+                                               format, i, format[i]);
                                exit(1);
                }
        }
@@ -759,9 +802,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
 void setuser() {
        /*
-       Workaround for clone()-based threads
-       Try to change EUID independently of main thread
-       */
+          Workaround for clone()-based threads
+          Try to change EUID independently of main thread
+          */
        if (pw) {
                setgroups(0, NULL);
                setregid(pw->pw_gid, pw->pw_gid);
@@ -781,6 +824,8 @@ void *emit_thread()
        timeout.tv_nsec = 0;
 
        setuser();
+        
+    //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT);
 
        for (;;) {
                pthread_mutex_lock(&emit_mutex);
@@ -789,6 +834,7 @@ void *emit_thread()
                        timeout.tv_sec = now.tv_sec + emit_timeout;
                        /* Do not wait until emit_packet will filled - it may be too long */
                        if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+                my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
                                pthread_mutex_unlock(&emit_mutex);
                                goto sendit;
                        }
@@ -814,7 +860,7 @@ void *emit_thread()
                fflush(stdout);
 #endif
                if (emit_count == netflow->MaxFlows) {
-               sendit:
+sendit:
                        gettime(&emit_time);
                        p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
                        size = netflow->HeaderSize + emit_count * netflow->FlowSize;
@@ -825,73 +871,73 @@ void *emit_thread()
                        peer_rot_cur = 0;
                        for (i = 0; i < npeers; i++) {
                                if (peers[i].type == PEER_FILE) {
-                                               if (netflow->SeqOffset)
-                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-                                               peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
-                                               ret = write(peers[i].write_fd, emit_packet, size);
-                                               if (ret < size) {
+                                       if (netflow->SeqOffset)
+                                               *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
+                                       peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
+                                       ret = write(peers[i].write_fd, emit_packet, size);
+                                       if (ret < size) {
 
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
-                                                       my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                               my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
                                                                i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
 #endif
 #undef MESSAGES
-                                               }
+                                       }
 #if ((DEBUG) & DEBUG_E)
-                                               commaneelse {
-                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                       else {
+                                               my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                emit_count, i + 1, peers[i].seq);
-                                               }
+                                       }
 #endif
-                                               peers[i].seq += emit_count;
-
-                                               /* Rate limit */
-                                               if (emit_rate_bytes) {
-                                                       sent += size;
-                                                       delay = sent / emit_rate_bytes;
-                                                       if (delay) {
-                                                               sent %= emit_rate_bytes;
-                                                               timeout.tv_sec = 0;
-                                                               timeout.tv_nsec = emit_rate_delay * delay;
-                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
-                                                       }
+                                       peers[i].seq += emit_count;
+
+                                       /* Rate limit */
+                                       if (emit_rate_bytes) {
+                                               sent += size;
+                                               delay = sent / emit_rate_bytes;
+                                               if (delay) {
+                                                       sent %= emit_rate_bytes;
+                                                       timeout.tv_sec = 0;
+                                                       timeout.tv_nsec = emit_rate_delay * delay;
+                                                       while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
                                                }
                                        }
+                               }
                                else
-                               if (peers[i].type == PEER_MIRROR) goto sendreal;
-                               else
-                               if (peers[i].type == PEER_ROTATE) 
-                                       if (peer_rot_cur++ == peer_rot_work) {
-                                       sendreal:
-                                               if (netflow->SeqOffset)
-                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
-                                               ret = send(peers[i].write_fd, emit_packet, size, 0);
-                                               if (ret < size) {
+                                       if (peers[i].type == PEER_MIRROR) goto sendreal;
+                                       else
+                                               if (peers[i].type == PEER_ROTATE) 
+                                                       if (peer_rot_cur++ == peer_rot_work) {
+sendreal:
+                                                               if (netflow->SeqOffset)
+                                                                       *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
+                                                               ret = send(peers[i].write_fd, emit_packet, size, 0);
+                                                               if (ret < size) {
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
-                                                       my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
-                                                               i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+                                                                       my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
+                                                                                       i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
 #endif
-                                               }
+                                                               }
 #if ((DEBUG) & DEBUG_E)
-                                               commaneelse {
-                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
-                                                               emit_count, i + 1, peers[i].seq);
-                                               }
+                                                               else {
+                                                                       my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+                                                                                       emit_count, i + 1, peers[i].seq);
+                                                               }
 #endif
-                                               peers[i].seq += emit_count;
-
-                                               /* Rate limit */
-                                               if (emit_rate_bytes) {
-                                                       sent += size;
-                                                       delay = sent / emit_rate_bytes;
-                                                       if (delay) {
-                                                               sent %= emit_rate_bytes;
-                                                               timeout.tv_sec = 0;
-                                                               timeout.tv_nsec = emit_rate_delay * delay;
-                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                               peers[i].seq += emit_count;
+
+                                                               /* Rate limit */
+                                                               if (emit_rate_bytes) {
+                                                                       sent += size;
+                                                                       delay = sent / emit_rate_bytes;
+                                                                       if (delay) {
+                                                                               sent %= emit_rate_bytes;
+                                                                               timeout.tv_sec = 0;
+                                                                               timeout.tv_nsec = emit_rate_delay * delay;
+                                                                               while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+                                                                       }
+                                                               }
                                                        }
-                                               }
-                                       }
                        }
                        if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
                        emit_sequence += emit_count;
@@ -928,9 +974,9 @@ void *unpending_thread()
 #endif
                if (put_into(pending_tail, COPY_INTO
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-                               , logbuf
+                                       , logbuf
 #endif
-                       ) < 0) {
+                           ) < 0) {
 #if ((DEBUG) & DEBUG_I)
                        pkts_lost_unpending++;
 #endif                         
@@ -996,7 +1042,7 @@ void *scan_thread()
                                } else {
                                        /* Flow is not frgamented */
                                        if ((now.sec - flow->mtime.sec) > inactive_lifetime
-                                               || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
+                                                       || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
                                                /* Flow expired */
 #if ((DEBUG) & DEBUG_S)
                                                my_log(LOG_DEBUG, "S: E %x", flow);
@@ -1031,9 +1077,9 @@ void *scan_thread()
 #endif
                        put_into(flow, MOVE_INTO
 #if ((DEBUG) & (DEBUG_S | DEBUG_U))
-                               , logbuf
+                                       , logbuf
 #endif
-                       );
+                               );
 #if ((DEBUG) & DEBUG_S)
                        my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
 #endif
@@ -1052,6 +1098,7 @@ void *cap_thread()
        char buf[64];
        char logbuf[256];
 #endif
+       int challenge;
 
        setuser();
 
@@ -1086,11 +1133,11 @@ void *cap_thread()
 #if ((DEBUG) & DEBUG_C) || defined MESSAGES
                                my_log(LOG_ERR,
 # if ((DEBUG) & DEBUG_C)
-                                       "%s %s %s", logbuf,
+                                               "%s %s %s", logbuf,
 # else
-                                       "%s %s",
+                                               "%s %s",
 # endif
-                                       "pending queue full:", "packet lost");
+                                               "pending queue full:", "packet lost");
 #endif
 #if ((DEBUG) & DEBUG_I)
                                pkts_lost_capture++;
@@ -1112,12 +1159,26 @@ void *cap_thread()
 
                        flow->sip = nl->ip_src;
                        flow->dip = nl->ip_dst;
-                       if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
-                               my_log(LOG_INFO, "Received test flow to corewars.org");
+                       flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
+                       
+                       /* It's going to be expensive calling this syscall on every flow.
+                        * We should keep a local hash table, for now just bear the overhead... - Sapan*/
+
+                       flow->slice_id=0;
+
+                       if (ulog_msg->mark > 0) {
+                               flow->slice_id = xid_to_slice_id(ulog_msg->mark);
+                       }
+
+                       if (flow->slice_id < 1)
+                               flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
+
+
+                       if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
+                               my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
                        }
                        flow->iif = snmp_index(ulog_msg->indev_name);
                        flow->oif = snmp_index(ulog_msg->outdev_name);
-                       flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
                        flow->proto = nl->ip_p;
                        flow->id = 0;
                        flow->tcp_flags = 0;
@@ -1134,12 +1195,12 @@ void *cap_thread()
                        off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
 
                        /*
-                       Offset (from network layer) to transport layer header/IP data
-                       IOW IP header size ;-)
+                          Offset (from network layer) to transport layer header/IP data
+                          IOW IP header size ;-)
 
-                       ?FIXME?
-                       Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
-                       */
+                          ?FIXME?
+                          Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
+                          */
                        off_tl = nl->ip_hl << 2;
                        tl = (void *) nl + off_tl;
 
@@ -1176,12 +1237,12 @@ void *cap_thread()
 #endif
 
                        /*
-                       Fortunately most interesting transport layer information fit
-                       into first 8 bytes of IP data field (minimal nonzero size).
-                       Thus we don't need actual packet reassembling to build whole
-                       transport layer data. We only check the fragment offset for
-                       zero value to find packet with this information.
-                       */
+                          Fortunately most interesting transport layer information fit
+                          into first 8 bytes of IP data field (minimal nonzero size).
+                          Thus we don't need actual packet reassembling to build whole
+                          transport layer data. We only check the fragment offset for
+                          zero value to find packet with this information.
+                          */
                        if (!off_frag && psize >= 8) {
                                switch (flow->proto) {
                                        case IPPROTO_TCP:
@@ -1211,7 +1272,7 @@ void *cap_thread()
                                                flow->dp = 0;
                                                break;
 
-                                       tl_known:
+tl_known:
 #if ((DEBUG) & DEBUG_C)
                                                sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
                                                strcat(logbuf, buf);
@@ -1222,8 +1283,8 @@ void *cap_thread()
 
                        /* Check for tcp flags presence (including CWR and ECE). */
                        if (flow->proto == IPPROTO_TCP
-                               && off_frag < 16
-                               && psize >= 16 - off_frag) {
+                                       && off_frag < 16
+                                       && psize >= 16 - off_frag) {
                                flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
 #if ((DEBUG) & DEBUG_C)
                                sprintf(buf, " TCP:%x", flow->tcp_flags);
@@ -1247,7 +1308,7 @@ void *cap_thread()
                        /* Flow complete - inform unpending_thread() about it */
                        pending_head->flags |= FLOW_PENDING;
                        pending_head = pending_head->next;
-               done:
+done:
                        pthread_cond_signal(&unpending_cond);
                }
        }
@@ -1257,39 +1318,39 @@ void *cap_thread()
 /* Copied out of CoDemux */
 
 static int init_daemon() {
-  pid_t pid;
-  FILE *pidfile;
-
-  pidfile = fopen(PIDFILE, "w");
-  if (pidfile == NULL) {
-    my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
-  }
-
-  if ((pid = fork()) < 0) {
-    fclose(pidfile);
-    my_log(LOG_ERR, "Could not fork!\n");
-    return(-1);
-  }
-  else if (pid != 0) {
-    /* i'm the parent, writing down the child pid  */
-    fprintf(pidfile, "%u\n", pid);
-    fclose(pidfile);
-    exit(0);
-  }
-
-  /* close the pid file */
-  fclose(pidfile);
-
-  /* routines for any daemon process
-     1. create a new session 
-     2. change directory to the root
-     3. change the file creation permission 
-  */
-  setsid();
-  chdir("/usr/local/fprobe");
-  umask(0);
-
-  return(0);
+       pid_t pid;
+       FILE *pidfile;
+
+       pidfile = fopen(PIDFILE, "w");
+       if (pidfile == NULL) {
+               my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
+       }
+
+       if ((pid = fork()) < 0) {
+               fclose(pidfile);
+               my_log(LOG_ERR, "Could not fork!\n");
+               return(-1);
+       }
+       else if (pid != 0) {
+               /* i'm the parent, writing down the child pid  */
+               fprintf(pidfile, "%u\n", pid);
+               fclose(pidfile);
+               exit(0);
+       }
+
+       /* close the pid file */
+       fclose(pidfile);
+
+       /* routines for any daemon process
+          1. create a new session 
+          2. change directory to the root
+          3. change the file creation permission 
+          */
+       setsid();
+       chdir("/var/local/fprobe");
+       umask(0);
+
+       return(0);
 }
 
 int main(int argc, char **argv)
@@ -1366,7 +1427,7 @@ int main(int argc, char **argv)
                if (log_suffix) *--log_suffix = ':';
        }
        if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
-       err_malloc:
+err_malloc:
                fprintf(stderr, "malloc(): %s\n", strerror(errno));
                exit(1);
        }
@@ -1381,8 +1442,8 @@ int main(int argc, char **argv)
        if (parms[rflag].count) {
                schedp.sched_priority = atoi(parms[rflag].arg);
                if (schedp.sched_priority
-                       && (schedp.sched_priority < sched_min
-                               || schedp.sched_priority > sched_max)) {
+                               && (schedp.sched_priority < sched_min
+                                       || schedp.sched_priority > sched_max)) {
                        fprintf(stderr, "Illegal %s\n", "realtime priority");
                        exit(1);
                }
@@ -1427,7 +1488,7 @@ int main(int argc, char **argv)
                sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
        if (parms[aflag].count) {
                if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
-               bad_lhost:
+bad_lhost:
                        fprintf(stderr, "Illegal %s\n", "source address");
                        exit(1);
                } else {
@@ -1513,12 +1574,12 @@ bad_collector:
                if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
                if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
                strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
-               
-               peers[npeers].write_fd = START_VALUE;
+
+               peers[npeers].write_fd = START_DATA_FD;
                peers[npeers].type = PEER_FILE;
                peers[npeers].seq = 0;
 
-               get_cur_epoch();
+               read_cur_epoch();
                npeers++;
        }
        else 
@@ -1529,12 +1590,12 @@ bad_collector:
        ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
        if (!ulog_handle) {
                fprintf(stderr, "libipulog initialization error: %s",
-                       ipulog_strerror(ipulog_errno));
+                               ipulog_strerror(ipulog_errno));
                exit(1);
        }
        if (sockbufsize)
                if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
-                       &sockbufsize, sizeof(sockbufsize)) < 0)
+                                       &sockbufsize, sizeof(sockbufsize)) < 0)
                        fprintf(stderr, "setsockopt(): %s", strerror(errno));
 
        /* Daemonize (if log destination stdout-free) */
@@ -1548,15 +1609,15 @@ bad_collector:
                while (1) {
                        int pid=fork();
                        if (pid==-1) {
-                                       fprintf(stderr, "fork(): %s", strerror(errno));
-                                       exit(1);
+                               fprintf(stderr, "fork(): %s", strerror(errno));
+                               exit(1);
                        }
                        else if (pid==0) {
-                                       setsid();
-                                       freopen("/dev/null", "r", stdin);
-                                       freopen("/dev/null", "w", stdout);
-                                       freopen("/dev/null", "w", stderr);
-                                       break;
+                               setsid();
+                               freopen("/dev/null", "r", stdin);
+                               freopen("/dev/null", "w", stdout);
+                               freopen("/dev/null", "w", stderr);
+                               break;
                        }
                        else {
                                while (wait3(NULL,0,NULL) < 1);
@@ -1573,6 +1634,7 @@ bad_collector:
 
        /* Initialization */
 
+    init_slice_id_hash();
        hash_init(); /* Actually for crc16 only */
        mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
        for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);
@@ -1586,13 +1648,13 @@ bad_collector:
        gettime(&start_time);
 
        /*
-       Build static pending queue as circular buffer.
-       */
+          Build static pending queue as circular buffer.
+          */
        if (!(pending_head = mem_alloc())) goto err_mem_alloc;
        pending_tail = pending_head;
        for (i = pending_queue_length - 1; i--;) {
                if (!(pending_tail->next = mem_alloc())) {
-               err_mem_alloc:
+err_mem_alloc:
                        my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
                        exit(1);
                }
@@ -1630,7 +1692,7 @@ bad_collector:
        for (i = 0; i < THREADS - 1; i++) {
                if (schedp.sched_priority > 0) {
                        if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
-                               (pthread_attr_setschedparam(&tattr, &schedp))) {
+                                       (pthread_attr_setschedparam(&tattr, &schedp))) {
                                my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
                                exit(1);
                        }
@@ -1667,15 +1729,15 @@ bad_collector:
 
        my_log(LOG_INFO, "pid: %d", pid);
        my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
-               "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
-               ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
-               netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
-               memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
-               emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
-               parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
+                       "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
+                       ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
+                       netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
+                       memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
+                       emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
+                       parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
        for (i = 0; i < nsnmp_rules; i++) {
                my_log(LOG_INFO, "SNMP rule #%d %s:%d",
-                       i + 1, snmp_rules[i].basename, snmp_rules[i].base);
+                               i + 1, snmp_rules[i].basename, snmp_rules[i].base);
        }
        for (i = 0; i < npeers; i++) {
                switch (peers[i].type) {
@@ -1688,16 +1750,16 @@ bad_collector:
                }
                snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
                my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
-                       inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
+                               inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
        }
 
        pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
 
        timeout.tv_usec = 0;
        while (!killed
-               || (total_elements - free_elements - pending_queue_length)
-               || emit_count
-               || pending_tail->flags) {
+                       || (total_elements - free_elements - pending_queue_length)
+                       || emit_count
+                       || pending_tail->flags) {
 
                if (!sigs) {
                        timeout.tv_sec = scan_interval;