Sapan Bhatia <sapanb@cs.princeton.edu>
- 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
+ 7/11/2007 Added data collection (-f) functionality, slice_id support in the header and log file
rotation.
15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
#include <sys/vfs.h>
#include <libipulog/libipulog.h>
+#include "vserver.h"
+
struct ipulog_handle {
int fd;
u_int8_t blocking;
#include <hash.h>
#include <mem.h>
-#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
+#define MAX_EPOCH_SIZE sizeof("32767")
#define STD_NETFLOW_PDU
enum {
extern struct NetFlow NetFlow5;
extern struct NetFlow NetFlow7;
-#define START_VALUE -5
+#define START_DATA_FD -5
#define mark_is_tos parms[Mflag].count
static unsigned scan_interval = 5;
-static int min_free = 0;
+static unsigned int min_free = 0;
static int frag_lifetime = 30;
static int inactive_lifetime = 60;
static int active_lifetime = 300;
static int npeers, npeers_rot;
static struct peer *peers;
static int sigs;
+static char cur_output_file[MAX_PATH_LEN];
static struct Flow *flows[1 << HASH_BITS];
static pthread_mutex_t flows_mutex[1 << HASH_BITS];
dst->sip = src->sip;
dst->dip = src->dip;
dst->tos = src->tos;
+ dst->slice_id = src->slice_id;
dst->proto = src->proto;
dst->tcp_flags = src->tcp_flags;
dst->id = src->id;
dst->flags = src->flags;
}
-void get_cur_epoch() {
+void read_cur_epoch() {
int fd;
- fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+ /* Reset to -1 in case the read fails */
+ cur_epoch=-1;
+ fd = open(LAST_EPOCH_FILE, O_RDONLY);
if (fd != -1) {
- char snum[7];
+ char snum[MAX_EPOCH_SIZE];
ssize_t len;
- len = read(fd, snum, sizeof(snum)-1);
+ len = read(fd, snum, MAX_EPOCH_SIZE-1);
if (len != -1) {
snum[len]='\0';
sscanf(snum,"%d",&cur_epoch);
}
+/* Dumps the current epoch in a file to cope with
+ * reboots and killings of fprobe */
+
void update_cur_epoch_file(int n) {
int fd, len;
- char snum[7];
- len=snprintf(snum,6,"%d",n);
- fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+ char snum[MAX_EPOCH_SIZE];
+ len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
+ fd = open(LAST_EPOCH_FILE, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
if (fd == -1) {
- my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+ my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
return;
}
write(fd, snum, len);
close(fd);
}
-unsigned get_log_fd(char *fname, int cur_fd) {
+/* Get the file descriptor corresponding to the current file.
+ * The kludgy implementation is to abstract away the 'current
+ * file descriptor', which may also be a socket.
+ */
+
+unsigned get_data_file_fd(char *fname, int cur_fd) {
struct Time now;
unsigned cur_uptime;
- /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
- * doesn't solve the problem */
struct statfs statfs;
int ret_fd;
+
+ /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+ * doesn't solve the problem */
gettime(&now);
cur_uptime = getuptime_minutes(&now);
-
- if (cur_fd!=START_VALUE) {
+ if (cur_fd != START_DATA_FD) {
if (fstatfs(cur_fd, &statfs) == -1) {
my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
}
}
}
- /* Epoch length in minutes */
+ /* If epoch length has been exceeded,
+ * or we're starting up
+ * or we're going back to the first epoch */
if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
- char nextname[MAX_PATH_LEN];
int write_fd;
prev_uptime = cur_uptime;
cur_epoch = (cur_epoch + 1) % log_epochs;
- last_peak = cur_epoch;
- if (cur_fd>0)
+ if (cur_epoch>last_peak) last_peak = cur_epoch;
+ if (cur_fd>0) {
close(cur_fd);
- snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
- if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
- my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
+ /* Compress the finished file */
+ char gzip_cmd[MAX_PATH_LEN+sizeof("gzip -f ")];
+ snprintf(gzip_cmd, MAX_PATH_LEN+sizeof("gzip -f "),"gzip -f %s",cur_output_file);
+ system(gzip_cmd);
+ }
+ snprintf(cur_output_file,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
+ if ((write_fd = open(cur_output_file, O_RDWR|O_CREAT|O_TRUNC,S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH)) < 0) {
+ my_log(LOG_ERR, "open(): %s (%s)\n", cur_output_file, strerror(errno));
exit(1);
}
+ if (fchmod(write_fd,S_IRUSR|S_IWUSR|S_IROTH|S_IRGRP) == -1) {
+ my_log(LOG_ERR, "fchmod() failed: %s (%s). Continuing...\n", cur_output_file, strerror(errno));
+ }
update_cur_epoch_file(cur_epoch);
ret_fd = write_fd;
}
flown->tcp_flags |= flow->tcp_flags;
flown->size += flow->size;
flown->pkts += flow->pkts;
+
+ /* The slice_id of the first slice_id of a flow is misleading. Reset the slice_id of the flow
+ * if a better value comes along. A good example of this is that by the time CoDemux sets the
+ * peercred of a flow, it has already been accounted for here and attributed to root. */
+
+ if (flown->slice_id<1)
+ flown->slice_id = flow->slice_id;
+
+
if (flow->flags & FLOW_FRAG) {
/* Fragmented flow require some additional work */
if (flow->flags & FLOW_TL) {
return ret;
}
+int onlyonce=0;
+
void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
{
int i;
case NETFLOW_IPV4_DST_ADDR:
((struct in_addr *) p)->s_addr = flow->dip.s_addr;
- if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+ if ((flow->dip.s_addr == inet_addr("10.0.0.8"))) {
my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
}
p += NETFLOW_IPV4_DST_ADDR_SIZE;
case NETFLOW_FLAGS7_1:
case NETFLOW_SRC_MASK:
case NETFLOW_DST_MASK:
+ if (onlyonce) {
+ my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
+ onlyonce=1;
+ }
*((uint8_t *) p) = 0;
p += NETFLOW_PAD8_SIZE;
break;
- case NETFLOW_XID:
- *((uint16_t *) p) = flow->tos;
- p += NETFLOW_XID_SIZE;
+ case NETFLOW_SLICE_ID:
+ *((uint32_t *) p) = flow->slice_id;
+ p += NETFLOW_SLICE_ID_SIZE;
break;
case NETFLOW_PAD16:
/* Unsupported (uint16_t) */
timeout.tv_nsec = 0;
setuser();
-
+
for (;;) {
pthread_mutex_lock(&emit_mutex);
while (!flows_emit) {
timeout.tv_sec = now.tv_sec + emit_timeout;
/* Do not wait until emit_packet will filled - it may be too long */
if (pthread_cond_timedwait(&emit_cond, &emit_mutex, &timeout) && emit_count) {
+ //my_log(LOG_INFO,"Timeout: %d, %d",emit_count, timeout.tv_sec);
pthread_mutex_unlock(&emit_mutex);
goto sendit;
}
if (peers[i].type == PEER_FILE) {
if (netflow->SeqOffset)
*((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
- peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
+ peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
ret = write(peers[i].write_fd, emit_packet, size);
if (ret < size) {
#undef MESSAGES
}
#if ((DEBUG) & DEBUG_E)
- commaneelse {
+ else {
my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
emit_count, i + 1, peers[i].seq);
}
#endif
}
#if ((DEBUG) & DEBUG_E)
- commaneelse {
+ else {
my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
emit_count, i + 1, peers[i].seq);
}
char buf[64];
char logbuf[256];
#endif
+ int challenge;
setuser();
flow->sip = nl->ip_src;
flow->dip = nl->ip_dst;
flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
- if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
- my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
+
+ /* It's going to be expensive calling this syscall on every flow.
+ * We should keep a local hash table, for now just bear the overhead... - Sapan*/
+
+ flow->slice_id=0;
+
+ if (ulog_msg->mark > 0) {
+ flow->slice_id = xid_to_slice_id(ulog_msg->mark);
+ }
+
+ if (flow->slice_id < 1)
+ flow->slice_id = ulog_msg->mark; // Couldn't look up the slice id, let's at least store the local xid
+
+
+ if ((flow->dip.s_addr == inet_addr("10.0.0.8")) || (flow->sip.s_addr == inet_addr("10.0.0.8"))) {
+ my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->slice_id);
}
flow->iif = snmp_index(ulog_msg->indev_name);
flow->oif = snmp_index(ulog_msg->outdev_name);
if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
- peers[npeers].write_fd = START_VALUE;
+ peers[npeers].write_fd = START_DATA_FD;
peers[npeers].type = PEER_FILE;
peers[npeers].seq = 0;
- get_cur_epoch();
+ read_cur_epoch();
npeers++;
}
else
/* Initialization */
+ init_slice_id_hash();
hash_init(); /* Actually for crc16 only */
mem_init(sizeof(struct Flow), bulk_quantity, memory_limit);
for (i = 0; i < 1 << HASH_BITS; i++) pthread_mutex_init(&flows_mutex[i], 0);