Initialize hash, change init function name so it doesn't collide with fprobe-ulog's
[fprobe-ulog.git] / src / fprobe-ulog.c
index 0c573ec..80ff15f 100644 (file)
@@ -37,6 +37,8 @@
 #include <sys/vfs.h>
 
 #include <libipulog/libipulog.h>
+#include "vserver.h"
+
 struct ipulog_handle {
        int fd;
        u_int8_t blocking;
@@ -95,7 +97,9 @@ struct ipulog_handle {
 #include <hash.h>
 #include <mem.h>
 
-#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define PIDFILE                "/var/log/fprobe-ulog.pid"
+#define LAST_EPOCH_FILE        "/var/log/fprobe_last_epoch"
+#define MAX_EPOCH_SIZE         sizeof("32767")
 #define STD_NETFLOW_PDU
 
 enum {
@@ -162,10 +166,10 @@ extern struct NetFlow NetFlow1;
 extern struct NetFlow NetFlow5;
 extern struct NetFlow NetFlow7;
 
-#define START_VALUE -5
+#define START_DATA_FD -5
 #define mark_is_tos parms[Mflag].count
 static unsigned scan_interval = 5;
-static int min_free = 0;
+static unsigned int min_free = 0;
 static int frag_lifetime = 30;
 static int inactive_lifetime = 60;
 static int active_lifetime = 300;
@@ -371,6 +375,7 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->sip = src->sip;
        dst->dip = src->dip;
        dst->tos = src->tos;
+       dst->xid = src->xid;
        dst->proto = src->proto;
        dst->tcp_flags = src->tcp_flags;
        dst->id = src->id;
@@ -385,13 +390,15 @@ inline void copy_flow(struct Flow *src, struct Flow *dst)
        dst->flags = src->flags;
 }
 
-void get_cur_epoch() {
+void read_cur_epoch() {
        int fd;
-       fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+       /* Reset to -1 in case the read fails */
+       cur_epoch=-1;
+       fd = open(LAST_EPOCH_FILE, O_RDONLY);
        if (fd != -1) {
-               char snum[7];
+               char snum[MAX_EPOCH_SIZE];
                ssize_t len;
-               len = read(fd, snum, sizeof(snum)-1);
+               len = read(fd, snum, MAX_EPOCH_SIZE-1);
                if (len != -1) {
                        snum[len]='\0';
                        sscanf(snum,"%d",&cur_epoch);
@@ -403,32 +410,40 @@ void get_cur_epoch() {
 }
 
 
+/* Dumps the current epoch in a file to cope with
+ * reboots and killings of fprobe */
+
 void update_cur_epoch_file(int n) {
        int fd, len;
-       char snum[7];
-       len=snprintf(snum,6,"%d",n);
-       fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+       char snum[MAX_EPOCH_SIZE];
+       len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
+       fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC);
        if (fd == -1) {
-               my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+               my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
                return;
        }
        write(fd, snum, len);
        close(fd);
 }
 
-unsigned get_log_fd(char *fname, int cur_fd) {
+/* Get the file descriptor corresponding to the current file.
+ * The kludgy implementation is to abstract away the 'current
+ * file descriptor', which may also be a socket. 
+ */
+
+unsigned get_data_file_fd(char *fname, int cur_fd) {
        struct Time now;
        unsigned cur_uptime;
-       /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
-        * doesn't solve the problem */
 
        struct statfs statfs;
        int ret_fd;
+
+       /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+        * doesn't solve the problem */
        gettime(&now);
        cur_uptime = getuptime_minutes(&now);
 
-
-       if (cur_fd!=START_VALUE) {
+       if (cur_fd != START_DATA_FD) {
                if (fstatfs(cur_fd, &statfs) == -1) {
                        my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
                }
@@ -447,7 +462,9 @@ unsigned get_log_fd(char *fname, int cur_fd) {
                }
        }
 
-       /* Epoch length in minutes */
+       /* If epoch length has been exceeded, 
+        * or we're starting up 
+        * or we're going back to the first epoch */
        if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
                char nextname[MAX_PATH_LEN];
                int write_fd;
@@ -457,10 +474,13 @@ unsigned get_log_fd(char *fname, int cur_fd) {
                if (cur_fd>0)
                        close(cur_fd);
                snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
-               if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
+               if ((write_fd = open(nextname, O_RDWR|O_CREAT|O_TRUNC)) < 0) {
                        my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
                        exit(1);
                }
+               if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
+                       my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", nextname, strerror(errno));
+               }
                update_cur_epoch_file(cur_epoch);
                ret_fd = write_fd;
        }
@@ -563,6 +583,15 @@ done:
                flown->tcp_flags |= flow->tcp_flags;
                flown->size += flow->size;
                flown->pkts += flow->pkts;
+
+               /* The xid of the first xid of a flow is misleading. Reset the xid of the flow
+                * if a better value comes along. A good example of this is that by the time CoDemux sets the
+                * peercred of a flow, it has already been accounted for here and attributed to root. */
+
+               if (flown->xid<1)
+                               flown->xid = flow->xid;
+
+
                if (flow->flags & FLOW_FRAG) {
                        /* Fragmented flow require some additional work */
                        if (flow->flags & FLOW_TL) {
@@ -609,6 +638,8 @@ done:
        return ret;
 }
 
+int onlyonce=0;
+
 void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 {
        int i;
@@ -625,7 +656,7 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
 
                        case NETFLOW_IPV4_DST_ADDR:
                                ((struct in_addr *) p)->s_addr = flow->dip.s_addr;
-                               if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+                               if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
                                        my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
                                }
                                p += NETFLOW_IPV4_DST_ADDR_SIZE;
@@ -724,11 +755,15 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
                        case NETFLOW_FLAGS7_1:
                        case NETFLOW_SRC_MASK:
                        case NETFLOW_DST_MASK:
+                               if (onlyonce) {
+                                       my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
+                                       onlyonce=1;
+                               }
                                *((uint8_t *) p) = 0;
                                p += NETFLOW_PAD8_SIZE;
                                break;
                        case NETFLOW_XID:
-                               *((uint16_t *) p) = flow->tos;
+                               *((uint32_t *) p) = flow->xid;
                                p += NETFLOW_XID_SIZE;
                                break;
                        case NETFLOW_PAD16:
@@ -830,7 +865,7 @@ sendit:
                                if (peers[i].type == PEER_FILE) {
                                        if (netflow->SeqOffset)
                                                *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-                                       peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
+                                       peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
                                        ret = write(peers[i].write_fd, emit_packet, size);
                                        if (ret < size) {
 
@@ -1055,6 +1090,7 @@ void *cap_thread()
        char buf[64];
        char logbuf[256];
 #endif
+       int challenge;
 
        setuser();
 
@@ -1116,8 +1152,23 @@ void *cap_thread()
                        flow->sip = nl->ip_src;
                        flow->dip = nl->ip_dst;
                        flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
-                       if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
-                               my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
+                       
+                       /* It's going to be expensive calling this syscall on every flow.
+                        * We should keep a local hash table, for now just bear the overhead... - Sapan*/
+
+                       flow->xid=0;
+
+                       if (ulog_msg->mark > 0) {
+                /* flow->xid is really the slice id :-/ */
+                               flow->xid = xid_to_slice_id(ulog_msg->mark);
+                       }
+
+                       if (flow->xid < 1 || flow->xid!=challenge) 
+                               flow->xid = ulog_msg->mark;
+
+
+                       if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
+                               my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->xid);
                        }
                        flow->iif = snmp_index(ulog_msg->indev_name);
                        flow->oif = snmp_index(ulog_msg->outdev_name);
@@ -1517,11 +1568,11 @@ bad_collector:
                if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
                strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
 
-               peers[npeers].write_fd = START_VALUE;
+               peers[npeers].write_fd = START_DATA_FD;
                peers[npeers].type = PEER_FILE;
                peers[npeers].seq = 0;
 
-               get_cur_epoch();
+               read_cur_epoch();
                npeers++;
        }
        else 
@@ -1576,6 +1627,7 @@ bad_collector:
 
        /* Initialization */
 
+    init_slice_id_hash();
        hash_init(); /* Actually for crc16 only */
        mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
        for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);