modify it under the terms of the GNU General Public License.
$Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $
+
+ Sapan Bhatia <sapanb@cs.princeton.edu>
+
+ 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file
+ rotation.
+
+ 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility.
+
*/
#include <common.h>
/* sig*() */
#include <signal.h>
+/* statfs() */
+
+#include <sys/vfs.h>
+
#include <libipulog/libipulog.h>
struct ipulog_handle {
int fd;
#include <hash.h>
#include <mem.h>
+#define PIDFILE "/var/log/fprobe-ulog.pid"
+#define STD_NETFLOW_PDU
+
enum {
aflag,
Bflag,
bflag,
cflag,
dflag,
+ Dflag,
eflag,
Eflag,
fflag,
Uflag,
uflag,
vflag,
+ Wflag,
Xflag,
};
{'b', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'c', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'d', MY_GETOPT_ARG_REQUIRED, 0, 0},
+ {'D', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'e', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'E', MY_GETOPT_ARG_REQUIRED, 0, 0},
{'f', MY_GETOPT_ARG_REQUIRED, 0, 0},
extern struct NetFlow NetFlow5;
extern struct NetFlow NetFlow7;
+#define START_VALUE -5
#define mark_is_tos parms[Mflag].count
static unsigned scan_interval = 5;
+static int min_free = 0;
static int frag_lifetime = 30;
static int inactive_lifetime = 60;
static int active_lifetime = 300;
"-u <user>\tUser to run as\n"
"-v <level>\tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n"
"-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n"
- "-y <remote:port>\tAddress of the NetFlow collector\n",
+ "-y <remote:port>\tAddress of the NetFlow collector\n"
"-f <writable file>\tFile to write data into\n"
"-T <n>\tRotate log file every n epochs\n"
+ "-W <n>\tSet current epoch to n. Useful when restarting fprobe\n"
"-E <[1..60]>\tSize of an epoch in minutes\n"
+ "-D <number of blocks>\tNumber of disk blocks to preserve as free space\n"
+ ,
VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max);
exit(0);
}
now->usec = t.tv_usec;
}
+
+inline time_t cmpMtime(struct Time *t1, struct Time *t2)
+{
+ return (t1->sec - t2->sec)/60;
+}
+
inline time_t cmpmtime(struct Time *t1, struct Time *t2)
{
return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000;
return cmpmtime(t, &start_time);
}
+/* Uptime in minutes */
+uint32_t getuptime_minutes(struct Time *t)
+{
+ /* Maximum uptime is about 49/2 days */
+ return cmpMtime(t, &start_time);
+}
+
hash_t hash_flow(struct Flow *flow)
{
if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F));
dst->flags = src->flags;
}
-unsigned get_log_fd(char *fname, unsigned cur_fd) {
+void get_cur_epoch() {
+ int fd;
+ fd = open("/tmp/fprobe_last_epoch",O_RDONLY);
+ if (fd != -1) {
+ char snum[7];
+ ssize_t len;
+ len = read(fd, snum, sizeof(snum)-1);
+ if (len != -1) {
+ snum[len]='\0';
+ sscanf(snum,"%d",&cur_epoch);
+ cur_epoch++; /* Let's not stone the last epoch */
+ close(fd);
+ }
+ }
+ return;
+}
+
+
+void update_cur_epoch_file(int n) {
+ int fd, len;
+ char snum[7];
+ len=snprintf(snum,6,"%d",n);
+ fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC);
+ if (fd == -1) {
+ my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0.");
+ return;
+ }
+ write(fd, snum, len);
+ close(fd);
+}
+
+unsigned get_log_fd(char *fname, int cur_fd) {
struct Time now;
unsigned cur_uptime;
+ /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that
+ * doesn't solve the problem */
+
+ struct statfs statfs;
int ret_fd;
gettime(&now);
- cur_uptime = getuptime(&now);
+ cur_uptime = getuptime_minutes(&now);
+
+
+ if (cur_fd!=START_VALUE) {
+ if (fstatfs(cur_fd, &statfs) == -1) {
+ my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks");
+ }
+ else {
+ if (min_free && (statfs.f_bavail < min_free))
+ switch(cur_epoch) {
+ case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */
+ my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out.");
+ exit(1);
+ default:
+ my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch);
+ cur_epoch = -1;
+ }
+ }
+ }
- /* Epoch lenght in minutes */
- if ((cur_uptime - prev_uptime) > (1000 * 60 * epoch_length)) {
+ /* Epoch length in minutes */
+ if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) {
char nextname[MAX_PATH_LEN];
int write_fd;
prev_uptime = cur_uptime;
cur_epoch = (cur_epoch + 1) % log_epochs;
- close(cur_fd);
+ if (cur_fd>0)
+ close(cur_fd);
snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch);
- if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) {
- fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno));
+ if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) {
+ my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno));
exit(1);
}
+ update_cur_epoch_file(cur_epoch);
ret_fd = write_fd;
}
- else
+ else {
ret_fd = cur_fd;
+ }
return(ret_fd);
}
case NETFLOW_IPV4_DST_ADDR:
((struct in_addr *) p)->s_addr = flow->dip.s_addr;
+ if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) {
+ my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts);
+ }
p += NETFLOW_IPV4_DST_ADDR_SIZE;
break;
*((uint8_t *) p) = 0;
p += NETFLOW_PAD8_SIZE;
break;
- case NETFLOW_PLANETLAB_XID:
+ case NETFLOW_XID:
*((uint16_t *) p) = flow->tos;
- p += NETFLOW_PLANETLAB_XID_SIZE;
+ p += NETFLOW_XID_SIZE;
break;
case NETFLOW_PAD16:
/* Unsupported (uint16_t) */
p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet);
size = netflow->HeaderSize + emit_count * netflow->FlowSize;
/* Netflow PDUs need to be padded to 1464 bytes - Sapan */
+#ifdef STD_NETFLOW_PDU
if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE;
+#endif
peer_rot_cur = 0;
for (i = 0; i < npeers; i++) {
- if (peers[0].type == PEER_FILE) {
+ if (peers[i].type == PEER_FILE) {
if (netflow->SeqOffset)
*((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq);
-#define MESSAGES
- peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd);
- ret = write(peers[0].write_fd, emit_packet, size);
+ peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd);
+ ret = write(peers[i].write_fd, emit_packet, size);
if (ret < size) {
+
#if ((DEBUG) & DEBUG_E) || defined MESSAGES
my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s",
i + 1, peers[i].seq, emit_count, size, ret, strerror(errno));
emit_count, i + 1, peers[i].seq);
}
#endif
- peers[0].seq += emit_count;
+ peers[i].seq += emit_count;
/* Rate limit */
if (emit_rate_bytes) {
flow->sip = nl->ip_src;
flow->dip = nl->ip_dst;
+ flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
+ if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) {
+ my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos);
+ }
flow->iif = snmp_index(ulog_msg->indev_name);
flow->oif = snmp_index(ulog_msg->outdev_name);
- flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos;
flow->proto = nl->ip_p;
flow->id = 0;
flow->tcp_flags = 0;
return 0;
}
+/* Copied out of CoDemux */
+
+static int init_daemon() {
+ pid_t pid;
+ FILE *pidfile;
+
+ pidfile = fopen(PIDFILE, "w");
+ if (pidfile == NULL) {
+ my_log(LOG_ERR, "%s creation failed\n", PIDFILE);
+ }
+
+ if ((pid = fork()) < 0) {
+ fclose(pidfile);
+ my_log(LOG_ERR, "Could not fork!\n");
+ return(-1);
+ }
+ else if (pid != 0) {
+ /* i'm the parent, writing down the child pid */
+ fprintf(pidfile, "%u\n", pid);
+ fclose(pidfile);
+ exit(0);
+ }
+
+ /* close the pid file */
+ fclose(pidfile);
+
+ /* routines for any daemon process
+ 1. create a new session
+ 2. change directory to the root
+ 3. change the file creation permission
+ */
+ setsid();
+ chdir("/usr/local/fprobe");
+ umask(0);
+
+ return(0);
+}
+
int main(int argc, char **argv)
{
char errpbuf[512];
}
if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg);
+ if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg);
if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg);
if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg);
if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg);
if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg);
if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg);
+ if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg);
if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg);
if (parms[nflag].count) {
switch (atoi(parms[nflag].arg)) {
else if (parms[fflag].count) {
// log into a file
if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc;
- if (!(peers[0].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
- strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN);
+ if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc;
+ strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN);
- peers[0].write_fd = -1;
- peers[0].type = PEER_FILE;
- peers[0].seq = 0;
+ peers[npeers].write_fd = START_VALUE;
+ peers[npeers].type = PEER_FILE;
+ peers[npeers].seq = 0;
+
+ get_cur_epoch();
npeers++;
}
else
/* Daemonize (if log destination stdout-free) */
my_log_open(ident, verbosity, log_dest);
- if (!(log_dest & 2)) {
- switch (fork()) {
- case -1:
- fprintf(stderr, "fork(): %s", strerror(errno));
- exit(1);
- case 0:
- setsid();
- freopen("/dev/null", "r", stdin);
- freopen("/dev/null", "w", stdout);
- freopen("/dev/null", "w", stderr);
- break;
+ init_daemon();
- default:
- exit(0);
+ if (!(log_dest & 2)) {
+ /* Crash-proofing - Sapan*/
+ while (1) {
+ int pid=fork();
+ if (pid==-1) {
+ fprintf(stderr, "fork(): %s", strerror(errno));
+ exit(1);
+ }
+ else if (pid==0) {
+ setsid();
+ freopen("/dev/null", "r", stdin);
+ freopen("/dev/null", "w", stdout);
+ freopen("/dev/null", "w", stderr);
+ break;
+ }
+ else {
+ while (wait3(NULL,0,NULL) < 1);
+ }
}
} else {
setvbuf(stdout, (char *)0, _IONBF, 0);