X-Git-Url: http://git.onelab.eu/?a=blobdiff_plain;f=src%2Ffprobe-ulog.c;h=3787de853314f54a33481e6a6083874d9cbee16f;hb=1608c1c708690871a85454d0897217381a06f921;hp=91a9fd1d8f4906e34c28a76be4a88e8ed0eb70dc;hpb=35c012752f96ebf494185a205405c66b884828e7;p=fprobe-ulog.git diff --git a/src/fprobe-ulog.c b/src/fprobe-ulog.c index 91a9fd1..3787de8 100644 --- a/src/fprobe-ulog.c +++ b/src/fprobe-ulog.c @@ -5,6 +5,14 @@ modify it under the terms of the GNU General Public License. $Id: fprobe-ulog.c,v 1.1.2.4 2005/01/30 09:06:19 sla Exp $ + + Sapan Bhatia + + 7/11/2007 Added data collection (-f) functionality, xid support in the header and log file + rotation. + + 15/11/2007 Added check to make sure fprobe doesn't overflow the disk. Also added a test facility. + */ #include @@ -24,6 +32,10 @@ /* sig*() */ #include +/* statfs() */ + +#include + #include struct ipulog_handle { int fd; @@ -83,12 +95,16 @@ struct ipulog_handle { #include #include +#define PIDFILE "/var/log/fprobe-ulog.pid" +#define STD_NETFLOW_PDU + enum { aflag, Bflag, bflag, cflag, dflag, + Dflag, eflag, Eflag, fflag, @@ -106,6 +122,7 @@ enum { Uflag, uflag, vflag, + Wflag, Xflag, }; @@ -115,6 +132,7 @@ static struct getopt_parms parms[] = { {'b', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'c', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'d', MY_GETOPT_ARG_REQUIRED, 0, 0}, + {'D', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'e', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'E', MY_GETOPT_ARG_REQUIRED, 0, 0}, {'f', MY_GETOPT_ARG_REQUIRED, 0, 0}, @@ -144,8 +162,10 @@ extern struct NetFlow NetFlow1; extern struct NetFlow NetFlow5; extern struct NetFlow NetFlow7; +#define START_VALUE -5 #define mark_is_tos parms[Mflag].count static unsigned scan_interval = 5; +static int min_free = 0; static int frag_lifetime = 30; static int inactive_lifetime = 60; static int active_lifetime = 300; @@ -249,10 +269,13 @@ void usage() "-u \tUser to run as\n" "-v \tMaximum log level (0=EMERG, ..., 6=INFO, 7=DEBUG) [6]\n" "-l <[dst][:id]>\tLog destination and log/pidfile idetifier [1]\n" - "-y \tAddress of the NetFlow collector\n", + "-y \tAddress of the NetFlow collector\n" "-f \tFile to write data into\n" "-T \tRotate log file every n epochs\n" + "-W \tSet current epoch to n. Useful when restarting fprobe\n" "-E <[1..60]>\tSize of an epoch in minutes\n" + "-D \tNumber of disk blocks to preserve as free space\n" + , VERSION, BULK_QUANTITY_MAX, bulk_quantity, sched_min, sched_max); exit(0); } @@ -295,6 +318,12 @@ void gettime(struct Time *now) now->usec = t.tv_usec; } + +inline time_t cmpMtime(struct Time *t1, struct Time *t2) +{ + return (t1->sec - t2->sec)/60; +} + inline time_t cmpmtime(struct Time *t1, struct Time *t2) { return (t1->sec - t2->sec) * 1000 + (t1->usec - t2->usec) / 1000; @@ -307,6 +336,13 @@ uint32_t getuptime(struct Time *t) return cmpmtime(t, &start_time); } +/* Uptime in minutes */ +uint32_t getuptime_minutes(struct Time *t) +{ + /* Maximum uptime is about 49/2 days */ + return cmpMtime(t, &start_time); +} + hash_t hash_flow(struct Flow *flow) { if (flow->flags & FLOW_FRAG) return hash(flow, sizeof(struct Flow_F)); @@ -349,28 +385,86 @@ inline void copy_flow(struct Flow *src, struct Flow *dst) dst->flags = src->flags; } -unsigned get_log_fd(char *fname, unsigned cur_fd) { +void get_cur_epoch() { + int fd; + fd = open("/tmp/fprobe_last_epoch",O_RDONLY); + if (fd != -1) { + char snum[7]; + ssize_t len; + len = read(fd, snum, sizeof(snum)-1); + if (len != -1) { + snum[len]='\0'; + sscanf(snum,"%d",&cur_epoch); + cur_epoch++; /* Let's not stone the last epoch */ + close(fd); + } + } + return; +} + + +void update_cur_epoch_file(int n) { + int fd, len; + char snum[7]; + len=snprintf(snum,6,"%d",n); + fd = open("/tmp/fprobe_last_epoch",O_WRONLY|O_CREAT|O_TRUNC); + if (fd == -1) { + my_log(LOG_ERR, "open() failed: /tmp/fprobe_last_epoch.The next restart will resume logging from epoch id 0."); + return; + } + write(fd, snum, len); + close(fd); +} + +unsigned get_log_fd(char *fname, int cur_fd) { struct Time now; unsigned cur_uptime; + /* We check if the amount of space left on the disk < some threshold and start reusing logs, or bail out if that + * doesn't solve the problem */ + + struct statfs statfs; int ret_fd; gettime(&now); - cur_uptime = getuptime(&now); - if ((cur_uptime - prev_uptime) > (1000 * epoch_length)) { + cur_uptime = getuptime_minutes(&now); + + + if (cur_fd!=START_VALUE) { + if (fstatfs(cur_fd, &statfs) == -1) { + my_log(LOG_ERR, "PANIC! Can't stat disk to calculate free blocks"); + } + else { + if (min_free && (statfs.f_bavail < min_free)) + switch(cur_epoch) { + case 0: /* Uh oh. Our first file filled up all of the free space. Just bail out. */ + my_log(LOG_ERR, "The first epoch filled up all the free space on disk. Bailing out."); + exit(1); + default: + my_log(LOG_INFO, "Disk almost full (%u free blocks). I'm going to drop data. Max epochs = %d\n",statfs.f_bavail,cur_epoch); + cur_epoch = -1; + } + } + } + + /* Epoch length in minutes */ + if (((cur_uptime - prev_uptime) > epoch_length) || (cur_fd < 0) || (cur_epoch==-1)) { char nextname[MAX_PATH_LEN]; int write_fd; prev_uptime = cur_uptime; cur_epoch = (cur_epoch + 1) % log_epochs; - close(cur_fd); + if (cur_fd>0) + close(cur_fd); snprintf(nextname,MAX_PATH_LEN,"%s.%d",fname,cur_epoch); - if ((write_fd = open(nextname, O_WRONLY|O_CREAT)) < 0) { - fprintf(stderr, "open(): %s (%s)\n", nextname, strerror(errno)); + if ((write_fd = open(nextname, O_WRONLY|O_CREAT|O_TRUNC)) < 0) { + my_log(LOG_ERR, "open(): %s (%s)\n", nextname, strerror(errno)); exit(1); } + update_cur_epoch_file(cur_epoch); ret_fd = write_fd; } - else + else { ret_fd = cur_fd; - return(cur_fd); + } + return(ret_fd); } struct Flow *find(struct Flow *where, struct Flow *what, struct Flow ***prev) @@ -528,6 +622,9 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) case NETFLOW_IPV4_DST_ADDR: ((struct in_addr *) p)->s_addr = flow->dip.s_addr; + if ((flow->dip.s_addr == inet_addr("64.34.177.39"))) { + my_log(LOG_INFO, "Created records for test flow. No. of packets=%d",flow->pkts); + } p += NETFLOW_IPV4_DST_ADDR_SIZE; break; @@ -627,7 +724,10 @@ void *fill(int fields, uint16_t *format, struct Flow *flow, void *p) *((uint8_t *) p) = 0; p += NETFLOW_PAD8_SIZE; break; - + case NETFLOW_XID: + *((uint16_t *) p) = flow->tos; + p += NETFLOW_XID_SIZE; + break; case NETFLOW_PAD16: /* Unsupported (uint16_t) */ case NETFLOW_SRC_AS: @@ -719,16 +819,18 @@ void *emit_thread() p = fill(netflow->HeaderFields, netflow->HeaderFormat, 0, &emit_packet); size = netflow->HeaderSize + emit_count * netflow->FlowSize; /* Netflow PDUs need to be padded to 1464 bytes - Sapan */ - if (size < 1464) size = 1464; +#ifdef STD_NETFLOW_PDU + if (size < NETFLOW_PDU_SIZE) size = NETFLOW_PDU_SIZE; +#endif peer_rot_cur = 0; for (i = 0; i < npeers; i++) { - if (peers[0].type == PEER_FILE) { + if (peers[i].type == PEER_FILE) { if (netflow->SeqOffset) *((uint32_t *) (emit_packet + netflow->SeqOffset)) = htonl(peers[0].seq); -#define MESSAGES - peers[0].write_fd = get_log_fd(peers[0].fname, peers[0].write_fd); - ret = write(peers[0].write_fd, emit_packet, size); + peers[i].write_fd = get_log_fd(peers[i].fname, peers[i].write_fd); + ret = write(peers[i].write_fd, emit_packet, size); if (ret < size) { + #if ((DEBUG) & DEBUG_E) || defined MESSAGES my_log(LOG_ERR, "write(to #%d, seq %d, flows %d, size %d) == %d: %s", i + 1, peers[i].seq, emit_count, size, ret, strerror(errno)); @@ -741,7 +843,7 @@ void *emit_thread() emit_count, i + 1, peers[i].seq); } #endif - peers[0].seq += emit_count; + peers[i].seq += emit_count; /* Rate limit */ if (emit_rate_bytes) { @@ -1010,9 +1112,12 @@ void *cap_thread() flow->sip = nl->ip_src; flow->dip = nl->ip_dst; + flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; + if ((flow->dip.s_addr == inet_addr("64.34.177.39")) || (flow->sip.s_addr == inet_addr("64.34.177.39"))) { + my_log(LOG_INFO, "Received test flow to corewars.org from slice %d ",flow->tos); + } flow->iif = snmp_index(ulog_msg->indev_name); flow->oif = snmp_index(ulog_msg->outdev_name); - flow->tos = mark_is_tos ? ulog_msg->mark : nl->ip_tos; flow->proto = nl->ip_p; flow->id = 0; flow->tcp_flags = 0; @@ -1149,6 +1254,44 @@ void *cap_thread() return 0; } +/* Copied out of CoDemux */ + +static int init_daemon() { + pid_t pid; + FILE *pidfile; + + pidfile = fopen(PIDFILE, "w"); + if (pidfile == NULL) { + my_log(LOG_ERR, "%s creation failed\n", PIDFILE); + } + + if ((pid = fork()) < 0) { + fclose(pidfile); + my_log(LOG_ERR, "Could not fork!\n"); + return(-1); + } + else if (pid != 0) { + /* i'm the parent, writing down the child pid */ + fprintf(pidfile, "%u\n", pid); + fclose(pidfile); + exit(0); + } + + /* close the pid file */ + fclose(pidfile); + + /* routines for any daemon process + 1. create a new session + 2. change directory to the root + 3. change the file creation permission + */ + setsid(); + chdir("/usr/local/fprobe"); + umask(0); + + return(0); +} + int main(int argc, char **argv) { char errpbuf[512]; @@ -1184,9 +1327,13 @@ int main(int argc, char **argv) } if (parms[Uflag].count) ulog_gmask = atoi(parms[Uflag].arg); + if (parms[Wflag].count) cur_epoch = atoi(parms[Wflag].arg); + if (parms[Tflag].count) log_epochs = atoi(parms[Tflag].arg); + if (parms[Eflag].count) epoch_length = atoi(parms[Eflag].arg); if (parms[sflag].count) scan_interval = atoi(parms[sflag].arg); if (parms[gflag].count) frag_lifetime = atoi(parms[gflag].arg); if (parms[dflag].count) inactive_lifetime = atoi(parms[dflag].arg); + if (parms[Dflag].count) min_free = atoi(parms[Dflag].arg); if (parms[eflag].count) active_lifetime = atoi(parms[eflag].arg); if (parms[nflag].count) { switch (atoi(parms[nflag].arg)) { @@ -1363,14 +1510,15 @@ bad_collector: } else if (parms[fflag].count) { // log into a file - char *fname; if (!(peers = malloc(npeers * sizeof(struct peer)))) goto err_malloc; - if (!(fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc; - strncpy(peers[0].fname, parms[fflag].arg, MAX_PATH_LEN); + if (!(peers[npeers].fname = malloc(strnlen(parms[fflag].arg,MAX_PATH_LEN)))) goto err_malloc; + strncpy(peers[npeers].fname, parms[fflag].arg, MAX_PATH_LEN); - peers[0].write_fd = -1; - peers[0].type = PEER_FILE; - peers[0].seq = 0; + peers[npeers].write_fd = START_VALUE; + peers[npeers].type = PEER_FILE; + peers[npeers].seq = 0; + + get_cur_epoch(); npeers++; } else @@ -1392,21 +1540,27 @@ bad_collector: /* Daemonize (if log destination stdout-free) */ my_log_open(ident, verbosity, log_dest); - if (!(log_dest & 2)) { - switch (fork()) { - case -1: - fprintf(stderr, "fork(): %s", strerror(errno)); - exit(1); - case 0: - setsid(); - freopen("/dev/null", "r", stdin); - freopen("/dev/null", "w", stdout); - freopen("/dev/null", "w", stderr); - break; + init_daemon(); - default: - exit(0); + if (!(log_dest & 2)) { + /* Crash-proofing - Sapan*/ + while (1) { + int pid=fork(); + if (pid==-1) { + fprintf(stderr, "fork(): %s", strerror(errno)); + exit(1); + } + else if (pid==0) { + setsid(); + freopen("/dev/null", "r", stdin); + freopen("/dev/null", "w", stdout); + freopen("/dev/null", "w", stderr); + break; + } + else { + while (wait3(NULL,0,NULL) < 1); + } } } else { setvbuf(stdout, (char *)0, _IONBF, 0);