$Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
- 7/11/2007 Sapan Bhatia <sapanb@cs.princeton.edu>
+ Sapan Bhatia <sapanb@cs.princeton.edu>
- Added data collection (-f) functionality, xid support in the header and log file
- rotation.
+ 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
+ rotation.
+
+ 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
+
*/
#include <common.h>
/* statfs() */
-#include <sys/statfs.h>
+#include <sys/vfs.h>
#include <libipulog/libipulog.h>
+#include "vserver.h"
+
struct ipulog_handle {
int fd;
u_int8_t blocking;
#include <hash.h>
#include <mem.h>
+#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define LAST_EPOCH_FILE "/var/log/fprobe_last_epoch"
+#define MAX_EPOCH_SIZE sizeof("32767")
+#define STD_NETFLOW_PDU
+
enum {
aflag,
Bflag,
{'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
extern struct NetFlow NetFlow5;
extern struct NetFlow NetFlow7;
-#define START_VALUE -5
+#define START_DATA_FD -5
#define mark_is_tos parms[Mflag].count
static unsigned scan_interval = 5;
-static int min_free = 0;
+static unsigned int min_free = 0;
static int frag_lifetime = 30;
static int inactive_lifetime = 60;
static int active_lifetime = 300;
#endif
static unsigned epoch_length=60, log_epochs=1;
-static unsigned cur_epoch=0,prev_uptime=0;
+static unsigned cur_epoch=0,prev_uptime=0,last_peak=0;
static unsigned bulk_quantity = BULK_QUANTITY;
static unsigned pending_queue_length = 100;
void usage()
{
fprintf(stdout,
- "fprobe-ulog: a NetFlow probe. Version %s\n"
- "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
- "\n"
- "-h\t\tDisplay this help\n"
- "-U <mask>\tULOG group bitwise mask [1]\n"
- "-s <seconds>\tHow often scan for expired flows [5]\n"
- "-g <seconds>\tFragmented flow lifetime [30]\n"
- "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
- "-f <filename>\tLog flow data in a file\n"
- "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
- "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
- "-a <address>\tUse <address> as source for NetFlow flow\n"
- "-X <rules>\tInterface name to SNMP-index conversion rules\n"
- "-M\t\tUse netfilter mark value as ToS flag\n"
- "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
- "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
- "-q <flows>\tPending queue length [100]\n"
- "-B <kilobytes>\tKernel capture buffer size [0]\n"
- "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
- "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
- "-c <directory>\tDirectory to chroot to\n"
- "-u <user>\tUser to run as\n"
- "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
- "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
- "-y <remote:port>\tAddress of the NetFlow collector\n"
- "-f <writable file>\tFile to write data into\n"
- "-T <n>\tRotate log file every n epochs\n"
- "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
- "-E <[1..60]>\tSize of an epoch in minutes\n"
- "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
- ,
+ "fprobe-ulog: a NetFlow probe. Version %s\n"
+ "Usage: fprobe-ulog [options] remote:port[/[local][/type]] ...\n"
+ "\n"
+ "-h\t\tDisplay this help\n"
+ "-U <mask>\tULOG group bitwise mask [1]\n"
+ "-s <seconds>\tHow often scan for expired flows [5]\n"
+ "-g <seconds>\tFragmented flow lifetime [30]\n"
+ "-e <seconds>\tActive flow lifetime (active timer) [300]\n"
+ "-f <filename>\tLog flow data in a file\n"
+ "-G <graduation period>\tRotate logs on an 0-hourly, 1-daily basis\n"
+ "-n <version>\tNetFlow version for use (1, 5 or 7) [5]\n"
+ "-a <address>\tUse <address> as source for NetFlow flow\n"
+ "-X <rules>\tInterface name to SNMP-index conversion rules\n"
+ "-M\t\tUse netfilter mark value as ToS flag\n"
+ "-b <flows>\tMemory bulk size (1..%u) [%u]\n"
+ "-m <kilobytes>\tMemory limit (0=no limit) [0]\n"
+ "-q <flows>\tPending queue length [100]\n"
+ "-B <kilobytes>\tKernel capture buffer size [0]\n"
+ "-r <priority>\tReal-time priority (0=disabled, %d..%d) [0]\n"
+ "-t <B:N>\tProduce <N> nanosecond delay after each <B> bytes sent [0:0]\n"
+ "-c <directory>\tDirectory to chroot to\n"
+ "-u <user>\tUser to run as\n"
+ "-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
+ "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
+ "-y <remote:port>\tAddress of the NetFlow collector\n"
+ "-f <writable file>\tFile to write data into\n"
+ "-T <n>\tRotate log file every n epochs\n"
+ "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
+ "-E <[1..60]>\tSize of an epoch in minutes\n"
+ "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
+ ,
VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
exit(0);
}
void info_debug()
{
my_log(LOG_DEBUG, "I: received:%d/%d (%lld) pending:%d/%d",
- pkts_total, pkts_total_fragmented, size_total,
- pkts_pending - pkts_pending_done, pending_queue_trace);
+ pkts_total, pkts_total_fragmented, size_total,
+ pkts_pending - pkts_pending_done, pending_queue_trace);
my_log(LOG_DEBUG, "I: ignored:%d lost:%d+%d",
- pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
+ pkts_ignored, pkts_lost_capture, pkts_lost_unpending);
my_log(LOG_DEBUG, "I: cache:%d/%d emit:%d/%d/%d",
- flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
+ flows_total, flows_fragmented, emit_sequence, emit_pkts, emit_queue);
my_log(LOG_DEBUG, "I: memory:%d/%d (%d)",
- total_elements, free_elements, total_memory);
+ total_elements, free_elements, total_memory);
}
#endif
dst->sip = src->sip;
dst->dip = src->dip;
dst->tos = src->tos;
+ dst->xid = src->xid;
dst->proto = src->proto;
dst->tcp_flags = src->tcp_flags;
dst->id = src->id;
dst->flags = src->flags;
}
-void get_cur_epoch() {
- int fd, len;
- fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+void read_cur_epoch() {
+ int fd;
+ /* Reset to -1 in case the read fails */
+ cur_epoch=-1;
+ fd = open(LAST_EPOCH_FILE, O_RDONLY);
if (fd != -1) {
- char snum[7];
- read(fd, snum, 7);
- sscanf(snum,"%d",&cur_epoch);
+ char snum[MAX_EPOCH_SIZE];
+ ssize_t len;
+ len = read(fd, snum, MAX_EPOCH_SIZE-1);
+ if (len != -1) {
+ snum[len]='\0';
+ sscanf(snum,"%d",&cur_epoch);
+ cur_epoch++; /* Let's not stone the last epoch */
+ close(fd);
+ }
}
return;
}
+/* Dumps the current epoch in a file to cope with
+ * reboots and killings of fprobe */
+
void update_cur_epoch_file(int n) {
int fd, len;
- char snum[7];
- len=snprintf(snum,6,"%d",n);
- fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+ char snum[MAX_EPOCH_SIZE];
+ len=snprintf(snum, MAX_EPOCH_SIZE-1,"%d", n);
+ fd = open(LAST_EPOCH_FILE, O_WRONLY|O_CREAT|O_TRUNC);
if (fd == -1) {
- my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+ my_log(LOG_ERR, "open() failed: %s.The next restart will resume logging from epoch id 0.",LAST_EPOCH_FILE);
return;
}
write(fd, snum, len);
close(fd);
}
-unsigned get_log_fd(char *fname, unsigned cur_fd) {
+/* Get the file descriptor corresponding to the current file.
+ * The kludgy implementation is to abstract away the 'current
+ * file descriptor', which may also be a socket.
+ */
+
+unsigned get_data_file_fd(char *fname, int cur_fd) {
struct Time now;
unsigned cur_uptime;
- /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
- * doesn't solve the problem */
struct statfs statfs;
int ret_fd;
+
+ /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+ * doesn't solve the problem */
gettime(&now);
cur_uptime = getuptime_minutes(&now);
- if (fstatfs(cur_fd, &statfs) && cur_fd!=START_VALUE) {
- my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
- }
- else {
- if (min_free && statfs.f_bfree < min_free)
- switch(cur_epoch) {
- case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
- my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
- exit(1);
- default:
- my_log(LOG_INFO, "Disk almost full. I'm going to drop data. Max epochs = %d\n",cur_epoch);
- cur_epoch = -1;
+ if (cur_fd != START_DATA_FD) {
+ if (fstatfs(cur_fd, &statfs) == -1) {
+ my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
+ }
+ else {
+ if (min_free && (statfs.f_bavail < min_free)
+ && (cur_epoch==last_peak))
+ {
+ my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
+ cur_epoch = -1;
}
+ /*
+ else
+ assume that we can reclaim space by overwriting our own files
+ and that the difference in size will not fill the disk - sapan
+ */
+ }
}
- /* Epoch length in minutes */
- if ((cur_uptime - prev_uptime) > epoch_length || cur_fd<0 || cur_epoch==-1) {
+ /* If epoch length has been exceeded,
+ * or we're starting up
+ * or we're going back to the first epoch */
+ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
char nextname[MAX_PATH_LEN];
int write_fd;
prev_uptime = cur_uptime;
cur_epoch = (cur_epoch + 1) % log_epochs;
- close(cur_fd);
+ if (cur_epoch>last_peak) last_peak = cur_epoch;
+ if (cur_fd>0)
+ close(cur_fd);
snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
update_cur_epoch_file(cur_epoch);
ret_fd = write_fd;
}
- else
+ else {
ret_fd = cur_fd;
+ }
return(ret_fd);
}
while (where) {
if (where->sip.s_addr == what->sip.s_addr
- && where->dip.s_addr == what->dip.s_addr
- && where->proto == what->proto) {
+ && where->dip.s_addr == what->dip.s_addr
+ && where->proto == what->proto) {
switch ((what->flags + where->flags) & FLOW_FRAGMASK) {
case 0:
/* Both unfragmented */
if ((what->sp == where->sp)
- && (what->dp == where->dp)) goto done;
+ && (what->dp == where->dp)) goto done;
break;
case 2:
/* Both fragmented */
return where;
}
-int put_into(struct Flow *flow, int flag
+ int put_into(struct Flow *flow, int flag
#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , char *logbuf
+ , char *logbuf
#endif
-)
+ )
{
int ret = 0;
hash_t h;
} else {
#if ((DEBUG) & (DEBUG_S | DEBUG_U)) || defined MESSAGES
my_log(LOG_ERR, "%s %s. %s",
- "mem_alloc():", strerror(errno), "packet lost");
+ "mem_alloc():", strerror(errno), "packet lost");
#endif
return -1;
}
/* Fragmented flow require some additional work */
if (flow->flags & FLOW_TL) {
/*
- ?FIXME?
- Several packets with FLOW_TL (attack)
- */
+ ?FIXME?
+ Several packets with FLOW_TL (attack)
+ */
flown->sp = flow->sp;
flown->dp = flow->dp;
}
if (flow->flags & FLOW_LASTFRAG) {
/*
- ?FIXME?
- Several packets with FLOW_LASTFRAG (attack)
- */
+ ?FIXME?
+ Several packets with FLOW_LASTFRAG (attack)
+ */
flown->sizeP = flow->sizeP;
}
flown->flags |= flow->flags;
flown->sizeF += flow->sizeF;
if ((flown->flags & FLOW_LASTFRAG)
- && (flown->sizeF >= flown->sizeP)) {
+ && (flown->sizeF >= flown->sizeP)) {
/* All fragments received - flow reassembled */
*flowpp = flown->next;
pthread_mutex_unlock(&flows_mutex[h]);
#if ((DEBUG) & (DEBUG_U | DEBUG_S))
, logbuf
#endif
- );
+ );
}
}
if (flag == MOVE_INTO) mem_free(flow);
return ret;
}
+int onlyonce=0;
+
void *fill(int fields, uint16_t *format, struct Flow *flow, void *p)
{
int i;
break;
case NETFLOW_PAD8:
- /* Unsupported (uint8_t) */
+ /* Unsupported (uint8_t) */
case NETFLOW_ENGINE_TYPE:
case NETFLOW_ENGINE_ID:
case NETFLOW_FLAGS7_1:
case NETFLOW_SRC_MASK:
case NETFLOW_DST_MASK:
+ if (onlyonce) {
+ my_log(LOG_CRIT, "Adding SRC/DST masks: this version of fprobe is seriously broken\n");
+ onlyonce=1;
+ }
*((uint8_t *) p) = 0;
p += NETFLOW_PAD8_SIZE;
break;
case NETFLOW_XID:
- *((uint16_t *) p) = flow->tos;
+ *((uint32_t *) p) = flow->xid;
p += NETFLOW_XID_SIZE;
break;
case NETFLOW_PAD16:
- /* Unsupported (uint16_t) */
+ /* Unsupported (uint16_t) */
case NETFLOW_SRC_AS:
case NETFLOW_DST_AS:
case NETFLOW_FLAGS7_2:
break;
case NETFLOW_PAD32:
- /* Unsupported (uint32_t) */
+ /* Unsupported (uint32_t) */
case NETFLOW_IPV4_NEXT_HOP:
case NETFLOW_ROUTER_SC:
*((uint32_t *) p) = 0;
default:
my_log(LOG_CRIT, "fill(): Unknown format at %x[%d]: %d",
- format, i, format[i]);
+ format, i, format[i]);
exit(1);
}
}
void setuser() {
/*
- Workaround for clone()-based threads
- Try to change EUID independently of main thread
- */
+ Workaround for clone()-based threads
+ Try to change EUID independently of main thread
+ */
if (pw) {
setgroups(0, NULL);
setregid(pw->pw_gid, pw->pw_gid);
fflush(stdout);
#endif
if (emit_count == netflow->MaxFlows) {
- sendit:
+sendit:
gettime(&emit_time);
p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
size = netflow->HeaderSize + emit_count * netflow->FlowSize;
/* Netflow PDUs need to be padded to 1464 bytes - Sapan */
+#ifdef STD_NETFLOW_PDU
if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
+#endif
peer_rot_cur = 0;
for (i = 0; i < npeers; i++) {
if (peers[i].type == PEER_FILE) {
- if (netflow->SeqOffset)
- *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
- peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
- ret = write(peers[i].write_fd, emit_packet, size);
- if (ret < size) {
+ if (netflow->SeqOffset)
+ *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
+ peers[i].write_fd = get_data_file_fd(peers[i].fname, peers[i].write_fd);
+ ret = write(peers[i].write_fd, emit_packet, size);
+ if (ret < size) {
#if ((DEBUG) & DEBUG_E) || defined MESSAGES
- my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
+ my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
#endif
#undef MESSAGES
- }
+ }
#if ((DEBUG) & DEBUG_E)
- commaneelse {
- my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+ commaneelse {
+ my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
emit_count, i + 1, peers[i].seq);
- }
+ }
#endif
- peers[i].seq += emit_count;
-
- /* Rate limit */
- if (emit_rate_bytes) {
- sent += size;
- delay = sent / emit_rate_bytes;
- if (delay) {
- sent %= emit_rate_bytes;
- timeout.tv_sec = 0;
- timeout.tv_nsec = emit_rate_delay * delay;
- while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
- }
+ peers[i].seq += emit_count;
+
+ /* Rate limit */
+ if (emit_rate_bytes) {
+ sent += size;
+ delay = sent / emit_rate_bytes;
+ if (delay) {
+ sent %= emit_rate_bytes;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = emit_rate_delay * delay;
+ while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
}
}
+ }
else
- if (peers[i].type == PEER_MIRROR) goto sendreal;
- else
- if (peers[i].type == PEER_ROTATE)
- if (peer_rot_cur++ == peer_rot_work) {
- sendreal:
- if (netflow->SeqOffset)
- *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
- ret = send(peers[i].write_fd, emit_packet, size, 0);
- if (ret < size) {
+ if (peers[i].type == PEER_MIRROR) goto sendreal;
+ else
+ if (peers[i].type == PEER_ROTATE)
+ if (peer_rot_cur++ == peer_rot_work) {
+sendreal:
+ if (netflow->SeqOffset)
+ *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[i].seq);
+ ret = send(peers[i].write_fd, emit_packet, size, 0);
+ if (ret < size) {
#if ((DEBUG) & DEBUG_E) || defined MESSAGES
- my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
- i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
+ my_log(LOG_ERR, "send(to #%d, seq %d, flows %d, size %d) == %d: %s",
+ i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
#endif
- }
+ }
#if ((DEBUG) & DEBUG_E)
- commaneelse {
- my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
- emit_count, i + 1, peers[i].seq);
- }
+ commaneelse {
+ my_log(LOG_DEBUG, "E: Emitted %d flow(s) to #%d, seq %d",
+ emit_count, i + 1, peers[i].seq);
+ }
#endif
- peers[i].seq += emit_count;
-
- /* Rate limit */
- if (emit_rate_bytes) {
- sent += size;
- delay = sent / emit_rate_bytes;
- if (delay) {
- sent %= emit_rate_bytes;
- timeout.tv_sec = 0;
- timeout.tv_nsec = emit_rate_delay * delay;
- while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+ peers[i].seq += emit_count;
+
+ /* Rate limit */
+ if (emit_rate_bytes) {
+ sent += size;
+ delay = sent / emit_rate_bytes;
+ if (delay) {
+ sent %= emit_rate_bytes;
+ timeout.tv_sec = 0;
+ timeout.tv_nsec = emit_rate_delay * delay;
+ while (nanosleep(&timeout, &timeout) == -1 && errno == EINTR);
+ }
+ }
}
- }
- }
}
if (npeers_rot) peer_rot_work = (peer_rot_work + 1) % npeers_rot;
emit_sequence += emit_count;
#endif
if (put_into(pending_tail, COPY_INTO
#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , logbuf
+ , logbuf
#endif
- ) < 0) {
+ ) < 0) {
#if ((DEBUG) & DEBUG_I)
pkts_lost_unpending++;
#endif
} else {
/* Flow is not frgamented */
if ((now.sec - flow->mtime.sec) > inactive_lifetime
- || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
+ || (flow->mtime.sec - flow->ctime.sec) > active_lifetime) {
/* Flow expired */
#if ((DEBUG) & DEBUG_S)
my_log(LOG_DEBUG, "S: E %x", flow);
#endif
put_into(flow, MOVE_INTO
#if ((DEBUG) & (DEBUG_S | DEBUG_U))
- , logbuf
+ , logbuf
#endif
- );
+ );
#if ((DEBUG) & DEBUG_S)
my_log(LOG_DEBUG, "%s%s", "S: FE", logbuf);
#endif
#if ((DEBUG) & DEBUG_C) || defined MESSAGES
my_log(LOG_ERR,
# if ((DEBUG) & DEBUG_C)
- "%s %s %s", logbuf,
+ "%s %s %s", logbuf,
# else
- "%s %s",
+ "%s %s",
# endif
- "pending queue full:", "packet lost");
+ "pending queue full:", "packet lost");
#endif
#if ((DEBUG) & DEBUG_I)
pkts_lost_capture++;
flow->sip = nl->ip_src;
flow->dip = nl->ip_dst;
- if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
- my_log(LOG_INFO, "Received test flow to corewars.org");
+ flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
+ /* It's going to be expensive calling this syscall on every flow.
+ * We should keep a local hash table, for now just bear the overhead... - Sapan*/
+ flow->xid = get_vhi_name(ulog_msg->mark);
+ if (flow->xid == -1 || flow->xid == 0)
+ flow->xid = ulog_msg->mark;
+
+ if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
+ my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
}
flow->iif = snmp_index(ulog_msg->indev_name);
flow->oif = snmp_index(ulog_msg->outdev_name);
- flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
flow->proto = nl->ip_p;
flow->id = 0;
flow->tcp_flags = 0;
off_frag = (ntohs(nl->ip_off) & IP_OFFMASK) << 3;
/*
- Offset (from network layer) to transport layer header/IP data
- IOW IP header size ;-)
+ Offset (from network layer) to transport layer header/IP data
+ IOW IP header size ;-)
- ?FIXME?
- Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
- */
+ ?FIXME?
+ Check ip_hl for valid value (>=5)? Maybe check CRC? No, thanks...
+ */
off_tl = nl->ip_hl << 2;
tl = (void *) nl + off_tl;
#endif
/*
- Fortunately most interesting transport layer information fit
- into first 8 bytes of IP data field (minimal nonzero size).
- Thus we don't need actual packet reassembling to build whole
- transport layer data. We only check the fragment offset for
- zero value to find packet with this information.
- */
+ Fortunately most interesting transport layer information fit
+ into first 8 bytes of IP data field (minimal nonzero size).
+ Thus we don't need actual packet reassembling to build whole
+ transport layer data. We only check the fragment offset for
+ zero value to find packet with this information.
+ */
if (!off_frag && psize >= 8) {
switch (flow->proto) {
case IPPROTO_TCP:
flow->dp = 0;
break;
- tl_known:
+tl_known:
#if ((DEBUG) & DEBUG_C)
sprintf(buf, " %d>%d", ntohs(flow->sp), ntohs(flow->dp));
strcat(logbuf, buf);
/* Check for tcp flags presence (including CWR and ECE). */
if (flow->proto == IPPROTO_TCP
- && off_frag < 16
- && psize >= 16 - off_frag) {
+ && off_frag < 16
+ && psize >= 16 - off_frag) {
flow->tcp_flags = *((uint8_t *)(tl + 13 - off_frag));
#if ((DEBUG) & DEBUG_C)
sprintf(buf, " TCP:%x", flow->tcp_flags);
/* Flow complete - inform unpending_thread() about it */
pending_head->flags |= FLOW_PENDING;
pending_head = pending_head->next;
- done:
+done:
pthread_cond_signal(&unpending_cond);
}
}
return 0;
}
+/* Copied out of CoDemux */
+
+static int init_daemon() {
+ pid_t pid;
+ FILE *pidfile;
+
+ pidfile = fopen(PIDFILE, "w");
+ if (pidfile == NULL) {
+ my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
+ }
+
+ if ((pid = fork()) < 0) {
+ fclose(pidfile);
+ my_log(LOG_ERR, "Could not fork!\n");
+ return(-1);
+ }
+ else if (pid != 0) {
+ /* i'm the parent, writing down the child pid */
+ fprintf(pidfile, "%u\n", pid);
+ fclose(pidfile);
+ exit(0);
+ }
+
+ /* close the pid file */
+ fclose(pidfile);
+
+ /* routines for any daemon process
+ 1. create a new session
+ 2. change directory to the root
+ 3. change the file creation permission
+ */
+ setsid();
+ chdir("/var/local/fprobe");
+ umask(0);
+
+ return(0);
+}
+
int main(int argc, char **argv)
{
char errpbuf[512];
if (log_suffix) *--log_suffix = ':';
}
if (!(pidfilepath = malloc(sizeof(PID_DIR) + 1 + strlen(ident) + 1 + 3 + 1))) {
- err_malloc:
+err_malloc:
fprintf(stderr, "malloc(): %s\n", strerror(errno));
exit(1);
}
if (parms[rflag].count) {
schedp.sched_priority = atoi(parms[rflag].arg);
if (schedp.sched_priority
- && (schedp.sched_priority < sched_min
- || schedp.sched_priority > sched_max)) {
+ && (schedp.sched_priority < sched_min
+ || schedp.sched_priority > sched_max)) {
fprintf(stderr, "Illegal %s\n", "realtime priority");
exit(1);
}
sscanf(parms[tflag].arg, "%d:%d", &emit_rate_bytes, &emit_rate_delay);
if (parms[aflag].count) {
if (getaddrinfo(parms[aflag].arg, 0, &hints, &res)) {
- bad_lhost:
+bad_lhost:
fprintf(stderr, "Illegal %s\n", "source address");
exit(1);
} else {
if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
-
- peers[npeers].write_fd = START_VALUE;
+
+ peers[npeers].write_fd = START_DATA_FD;
peers[npeers].type = PEER_FILE;
peers[npeers].seq = 0;
- get_cur_epoch();
+ read_cur_epoch();
npeers++;
}
else
ulog_handle = ipulog_create_handle(ulog_gmask, CAPTURE_SIZE);
if (!ulog_handle) {
fprintf(stderr, "libipulog initialization error: %s",
- ipulog_strerror(ipulog_errno));
+ ipulog_strerror(ipulog_errno));
exit(1);
}
if (sockbufsize)
if (setsockopt(ulog_handle->fd, SOL_SOCKET, SO_RCVBUF,
- &sockbufsize, sizeof(sockbufsize)) < 0)
+ &sockbufsize, sizeof(sockbufsize)) < 0)
fprintf(stderr, "setsockopt(): %s", strerror(errno));
/* Daemonize (if log destination stdout-free) */
my_log_open(ident, verbosity, log_dest);
+
+ init_daemon();
+
if (!(log_dest & 2)) {
/* Crash-proofing - Sapan*/
while (1) {
int pid=fork();
if (pid==-1) {
- fprintf(stderr, "fork(): %s", strerror(errno));
- exit(1);
+ fprintf(stderr, "fork(): %s", strerror(errno));
+ exit(1);
}
else if (pid==0) {
- setsid();
- freopen("/dev/null", "r", stdin);
- freopen("/dev/null", "w", stdout);
- freopen("/dev/null", "w", stderr);
- break;
+ setsid();
+ freopen("/dev/null", "r", stdin);
+ freopen("/dev/null", "w", stdout);
+ freopen("/dev/null", "w", stderr);
+ break;
}
else {
while (wait3(NULL,0,NULL) < 1);
gettime(&start_time);
/*
- Build static pending queue as circular buffer.
- */
+ Build static pending queue as circular buffer.
+ */
if (!(pending_head = mem_alloc())) goto err_mem_alloc;
pending_tail = pending_head;
for (i = pending_queue_length - 1; i--;) {
if (!(pending_tail->next = mem_alloc())) {
- err_mem_alloc:
+err_mem_alloc:
my_log(LOG_CRIT, "mem_alloc(): %s", strerror(errno));
exit(1);
}
for (i = 0; i < THREADS - 1; i++) {
if (schedp.sched_priority > 0) {
if ((pthread_attr_setschedpolicy(&tattr, SCHED)) ||
- (pthread_attr_setschedparam(&tattr, &schedp))) {
+ (pthread_attr_setschedparam(&tattr, &schedp))) {
my_log(LOG_CRIT, "pthread_attr_setschedpolicy(): %s", strerror(errno));
exit(1);
}
my_log(LOG_INFO, "pid: %d", pid);
my_log(LOG_INFO, "options: u=%u s=%u g=%u d=%u e=%u n=%u a=%s "
- "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
- ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
- netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
- memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
- emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
- parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
+ "M=%d b=%u m=%u q=%u B=%u r=%u t=%u:%u c=%s u=%s v=%u l=%u%s",
+ ulog_gmask, scan_interval, frag_lifetime, inactive_lifetime, active_lifetime,
+ netflow->Version, inet_ntoa(saddr.sin_addr), mark_is_tos, bulk_quantity,
+ memory_limit >> 10, pending_queue_length, sockbufsize >> 10, schedp.sched_priority - 1,
+ emit_rate_bytes, emit_rate_delay, parms[cflag].count ? parms[cflag].arg : "",
+ parms[uflag].count ? parms[uflag].arg : "", verbosity, log_dest, log_suffix ? log_suffix : "");
for (i = 0; i < nsnmp_rules; i++) {
my_log(LOG_INFO, "SNMP rule #%d %s:%d",
- i + 1, snmp_rules[i].basename, snmp_rules[i].base);
+ i + 1, snmp_rules[i].basename, snmp_rules[i].base);
}
for (i = 0; i < npeers; i++) {
switch (peers[i].type) {
}
snprintf(errpbuf, sizeof(errpbuf), "%s", inet_ntoa(peers[i].laddr.sin_addr));
my_log(LOG_INFO,"collector #%d: %s:%u/%s/%c", i + 1,
- inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
+ inet_ntoa(peers[i].addr.sin_addr), ntohs(peers[i].addr.sin_port), errpbuf, c);
}
pthread_sigmask(SIG_UNBLOCK, &sig_mask, 0);
timeout.tv_usec = 0;
while (!killed
- || (total_elements - free_elements - pending_queue_length)
- || emit_count
- || pending_tail->flags) {
+ || (total_elements - free_elements - pending_queue_length)
+ || emit_count
+ || pending_tail->flags) {
if (!sigs) {
timeout.tv_sec = scan_interval;