Bug fixes.
[fprobe-ulog.git] / src / fprobe-ulog.c
index cdcf170..0d84804 100644 (file)
@@ -5,6 +5,11 @@
        modify it under the terms of the GNU General Public License.
 
        $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
+
+       7/11/2007       Sapan Bhatia <sapanb@cs.princeton.edu> 
+                       
+               Added data collection (-f) functionality, xid support in the header and log file
+               rotation.
 */
 
 #include <common.h>
@@ -90,6 +95,7 @@ enum {
        cflag,
        dflag,
        eflag,
+       Eflag,
        fflag,
        gflag,
        hflag,
@@ -101,6 +107,7 @@ enum {
        rflag,
        sflag,
        tflag,
+       Tflag,
        Uflag,
        uflag,
        vflag,
@@ -114,6 +121,7 @@ static struct getopt_parms parms[] = {
        {'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'g', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'h', 0, 0, 0},
@@ -125,6 +133,7 @@ static struct getopt_parms parms[] = {
        {'r', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'s', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'t', MY_GETOPT_ARG_REQUIRED, 0, 0},
+       {'T', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'U', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'u', MY_GETOPT_ARG_REQUIRED, 0, 0},
        {'v', MY_GETOPT_ARG_REQUIRED, 0, 0},
@@ -152,6 +161,10 @@ static int sockbufsize;
 #else
 #define BULK_QUANTITY 200
 #endif
+
+static unsigned epoch_length=60, log_epochs=1; 
+static unsigned cur_epoch=0,prev_uptime=0;
+
 static unsigned bulk_quantity = BULK_QUANTITY;
 static unsigned pending_queue_length = 100;
 static struct NetFlow *netflow = &NetFlow5;
@@ -241,8 +254,10 @@ void usage()
                "-u <user>\tUser to run as\n"
                "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
                "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
-               "-y <remote:port>\tAddress of the NetFlow collector\n",
+               "-y <remote:port>\tAddress of the NetFlow collector\n"
                "-f <writable file>\tFile to write data into\n"
+               "-T <n>\tRotate log file every n epochs\n"
+               "-E <[1..60]>\tSize of an epoch in minutes\n",
                VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
        exit(0);
 }
@@ -285,6 +300,12 @@ void gettime(struct Time *now)
        now->usec = t.tv_usec;
 }
 
+
+inline time_t cmpMtime(struct Time *t1, struct Time *t2)
+{
+       return (t1->sec - t2->sec)/60;
+}
+
 inline time_t cmpmtime(struct Time *t1, struct Time *t2)
 {
        return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
@@ -297,6 +318,13 @@ uint32_t getuptime(struct Time *t)
        return cmpmtime(t, &start_time);
 }
 
+/* Uptime in minutes */
+uint32_t getuptime_minutes(struct Time *t)
+{
+       /* Maximum uptime is about 49/2 days */
+       return cmpMtime(t, &start_time);
+}
+
 hash_t hash_flow(struct Flow *flow)
 {
        if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
@@ -339,6 +367,32 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->flags = src->flags;
 }
 
+unsigned get_log_fd(char *fname, unsigned cur_fd) {
+       struct Time now;
+       unsigned cur_uptime;
+       int ret_fd;
+       gettime(&now);
+       cur_uptime = getuptime_minutes(&now);
+
+       /* Epoch length in minutes */
+       if ((cur_uptime - prev_uptime) > epoch_length || cur_fd==-1) {
+               char nextname[MAX_PATH_LEN];
+               int write_fd;
+               prev_uptime = cur_uptime;
+               cur_epoch = (cur_epoch + 1) % log_epochs;
+               close(cur_fd);
+               snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+               if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
+                       fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
+                       exit(1);
+               }
+               ret_fd = write_fd;
+       }
+       else
+               ret_fd = cur_fd;
+       return(ret_fd);
+}
+
 struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev)
 {
        struct Flow **flowpp;
@@ -494,6 +548,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        case NETFLOW_IPV4_DST_ADDR:
                                ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
+                               if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                                       my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
+                               }
                                p += NETFLOW_IPV4_DST_ADDR_SIZE;
                                break;
 
@@ -593,7 +650,10 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                                *((uint8_t *) p) = 0;
                                p += NETFLOW_PAD8_SIZE;
                                break;
-
+                       case NETFLOW_XID:
+                               *((uint16_t *) p) = flow->tos;
+                               p += NETFLOW_XID_SIZE;
+                               break;
                        case NETFLOW_PAD16:
                        /* Unsupported (uint16_t) */
                        case NETFLOW_SRC_AS:
@@ -685,13 +745,13 @@ void *emit_thread()
                        p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
                        size = netflow->HeaderSize + emit_count * netflow->FlowSize;
                        /* Netflow PDUs need to be padded to 1464 bytes - Sapan */
-                       if (size < 1464) size = 1464;
+                       if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
                        peer_rot_cur = 0;
                        for (i = 0; i < npeers; i++) {
-                               if (peers[0].type == PEER_FILE) {
+                               if (peers[i].type == PEER_FILE) {
                                                if (netflow->SeqOffset)
                                                        *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-#define MESSAGES
+                                               peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
                                                ret = write(peers[0].write_fd, emit_packet, size);
                                                if (ret < size) {
 #if ((DEBUG) & DEBUG_E) || defined MESSAGES
@@ -975,6 +1035,9 @@ void *cap_thread()
 
                        flow->sip = nl->ip_src;
                        flow->dip = nl->ip_dst;
+                       if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                               my_log(LOG_INFO, "Received test flow to corewars.org");
+                       }
                        flow->iif = snmp_index(ulog_msg->indev_name);
                        flow->oif = snmp_index(ulog_msg->outdev_name);
                        flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
@@ -1149,6 +1212,8 @@ int main(int argc, char **argv)
        }
 
        if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
+       if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
+       if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
        if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
        if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
        if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
@@ -1328,16 +1393,13 @@ bad_collector:
        }
        else if (parms[fflag].count) {
                // log into a file
-               char *fname;
                if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
-               fname = parms[fflag].arg;
-               if ((write_fd = open(fname, O_WRONLY|O_CREAT)) < 0) {
-                       fprintf(stderr, "open(): %s (%s)\n", fname, strerror(errno));
-                       exit(1);
-               }
-               peers[0].write_fd = write_fd;
-               peers[0].type = PEER_FILE;
-               peers[0].seq = 0;
+               if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
+               strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
+               
+               peers[npeers].write_fd = -1;
+               peers[npeers].type = PEER_FILE;
+               peers[npeers].seq = 0;
                npeers++;
        }
        else