Make fprobe compress the collected flow logs.
[fprobe-ulog.git] / src / fprobe-ulog.c
index 8c626c4..bca3b5d 100644 (file)
@@ -8,7 +8,7 @@
 
                        Sapan Bhatia <sapanb@cs.princeton.edu> 
                        
-       7/11/2007       Added data collection (-f) functionality, xid support in the header and log file
+       7/11/2007       Added data collection (-f) functionality, slice_id support in the header and log file
                        rotation.
 
        15/11/2007      Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
@@ -217,6 +217,7 @@ static int sched_min, sched_max;
 static int npeers, npeers_rot;
 static struct peer *peers;
 static int sigs;
+static char cur_output_file[MAX_PATH_LEN];
 
 static struct Flow *flows[1 << HASH_BITS];
 static pthread_mutex_t flows_mutex[1 << HASH_BITS];
@@ -375,7 +376,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->sip = src->sip;
        dst->dip = src->dip;
        dst->tos = src->tos;
-       dst->xid = src->xid;
+       dst->slice_id = src->slice_id;
        dst->proto = src->proto;
        dst->tcp_flags = src->tcp_flags;
        dst->id = src->id;
@@ -417,7 +418,7 @@ void update_cur_epoch_file(int n) {
        int fd, len;
        char snum[MAX_EPOCH_SIZE];
        len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
-       fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRWXU|S_IRGRP|S_IROTH);
+       fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
        if (fd == -1) {
                my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
                return;
@@ -466,18 +467,25 @@ unsigned get_data_file_fd(char *fname, int cur_fd) {
         * or we're starting up 
         * or we're going back to the first epoch */
        if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
-               char nextname[MAX_PATH_LEN];
                int write_fd;
                prev_uptime = cur_uptime;
                cur_epoch = (cur_epoch + 1) % log_epochs;
                if (cur_epoch>last_peak) last_peak = cur_epoch;
-               if (cur_fd>0)
+               if (cur_fd>0) {
                        close(cur_fd);
-               snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
-               if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
-                       my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
+            /* Compress the finished file */
+            char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")]; 
+            snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
+            system(gzip_cmd);
+        }
+               snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+               if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
+                       my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
                        exit(1);
                }
+               if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
+                       my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
+               }
                update_cur_epoch_file(cur_epoch);
                ret_fd = write_fd;
        }
@@ -580,6 +588,15 @@ done:
                flown->tcp_flags |= flow->tcp_flags;
                flown->size += flow->size;
                flown->pkts += flow->pkts;
+
+               /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
+                * if a better value comes along. A good example of this is that by the time CoDemux sets the
+                * peercred of a flow, it has already been accounted for here and attributed to root. */
+
+               if (flown->slice_id<1)
+                               flown->slice_id = flow->slice_id;
+
+
                if (flow->flags & FLOW_FRAG) {
                        /* Fragmented flow require some additional work */
                        if (flow->flags & FLOW_TL) {
@@ -644,7 +661,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        case NETFLOW_IPV4_DST_ADDR:
                                ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
-                               if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                               if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
                                        my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
                                }
                                p += NETFLOW_IPV4_DST_ADDR_SIZE;
@@ -750,9 +767,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                *((uint8_t *) p) = 0;
                                p += NETFLOW_PAD8_SIZE;
                                break;
-                       case NETFLOW_XID:
-                               *((uint32_t *) p) = flow->xid;
-                               p += NETFLOW_XID_SIZE;
+                       case NETFLOW_SLICE_ID:
+                               *((uint32_t *) p) = flow->slice_id;
+                               p += NETFLOW_SLICE_ID_SIZE;
                                break;
                        case NETFLOW_PAD16:
                                /* Unsupported (uint16_t) */
@@ -807,6 +824,8 @@ void *emit_thread()
        timeout.tv_nsec = 0;
 
        setuser();
+        
+    //pthread_mutexattr_setprotocol(&md->MutexAttr,PTHREAD_PRIO_INHERIT);
 
        for (;;) {
                pthread_mutex_lock(&emit_mutex);
@@ -815,6 +834,7 @@ void *emit_thread()
                        timeout.tv_sec = now.tv_sec + emit_timeout;
                        /* Do not wait until emit_packet will filled - it may be too long */
                        if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+                my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
                                pthread_mutex_unlock(&emit_mutex);
                                goto sendit;
                        }
@@ -864,7 +884,7 @@ sendit:
 #undef MESSAGES
                                        }
 #if ((DEBUG) & DEBUG_E)
-                                       commaneelse {
+                                       else {
                                                my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                emit_count, i + 1, peers[i].seq);
                                        }
@@ -899,7 +919,7 @@ sendreal:
 #endif
                                                                }
 #if ((DEBUG) & DEBUG_E)
-                                                               commaneelse {
+                                                               else {
                                                                        my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
                                                                                        emit_count, i + 1, peers[i].seq);
                                                                }
@@ -1143,17 +1163,19 @@ void *cap_thread()
                        
                        /* It's going to be expensive calling this syscall on every flow.
                         * We should keep a local hash table, for now just bear the overhead... - Sapan*/
+
+                       flow->slice_id=0;
+
                        if (ulog_msg->mark > 0) {
-                               flow->xid = get_vhi_name(ulog_msg->mark);
-                               challenge = get_vhi_name(ulog_msg->mark);
+                               flow->slice_id = xid_to_slice_id(ulog_msg->mark);
                        }
 
-                       if (flow->xid < 1 || flow->xid!=challenge) 
-                               flow->xid = ulog_msg->mark;
+                       if (flow->slice_id < 1)
+                               flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
 
 
-                       if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
-                               my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
+                       if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
+                               my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
                        }
                        flow->iif = snmp_index(ulog_msg->indev_name);
                        flow->oif = snmp_index(ulog_msg->outdev_name);
@@ -1612,6 +1634,7 @@ bad_collector:
 
        /* Initialization */
 
+    init_slice_id_hash();
        hash_init(); /* Actually for crc16 only */
        mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
        for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);